Получение списка файлов в удалённом репозитории

от автора

Как‐то понадобился мне просмотр списка файлов в удалённом репозитории. Клонировать его при этом как‐то не очень хотелось. Поиск в интернете ожидаемо дал множество ответов вида «это невозможно, делайте клон». А мне‐то надо всего‐навсего убедиться, что по некоторой ссылке находится репозиторий, соответствующий некоторому архиву с исходными кодами. Так как «некоторая ссылка» находится на странице с описанием содержимого этого архива (точнее, дополнения в этом архиве), то мне показалось достаточным сравнить только список файлов. Как быть?
Конечно, Mercurial не предоставляет практически никаких возможностей работы с удалённым репозиторием. Точнее, можно сделать push и pull (ну и clone как частный случай последнего). Но можно ли сделать pull, не затрагивая при этом файловую систему? Ответ: можно, здесь нам поможет hg incoming. Собственно, алгоритм работы такой:

  1. Создать где‐то новый пустой репозиторий. В пустой репозиторий можно делать pull из любого репозитория.
  2. Используя hg incoming получить список изменений. Так как hg incoming использует те же функции, что и hg log, то мы не ограничены в возможностях изменения его вывода. В частности, можно получить список всех файлов, изменённых в каждой из ревизий, или даже сами изменения в формате unified diff (с расширениями git для бинарных файлов). Diff нам не нужен, а вот список всех изменённых файлов — пригодится.
  3. Так как мы получаем все ревизии, то по ходу дела можно в дополнение к списку родилей к каждому изменению присоединить и список детей. Отсутствие детей, которые не являются предками ревизии, список файлов в которой нас интересует, нас не волнует.
  4. У mercurial есть одна ревизия, которая обязательно присутствует в любом репозитории и при том является единственной, реально не имеющей ни одного родителя: -1:0000000000000000000000000000000000000000. Это хорошая начальная точка.
    Начиная с данной ревизии найдём список файлов во всех остальных ревизиях (список файлов в начальной ревизии известен: он пуст). Для этого
    1. Для каждой ревизии, кроме начальной, возьмём список файлов из первого родителя. Ревизии обходятся от родителей к детям.
    2. Добавим в этот список список добавленных файлов (его вы получите, если используете hg incoming --format xml --verbose: в тёге paths).
    3. Удалим из этого списка список удалённых файлов (получается там же).

  5. Теперь найдём ревизию, не имеющую ни одного потомка. Это и будет ревизия, запрошенная с помощью hg incoming --rev revspec. Найдя эту ревизию, выведем список файлов в ней.

Замечу, что вывод hg incoming с форматом по‐умолчанию невозможно использовать для такой операции. Надо либо писать свой шаблон с {file_adds}, {file_mods} и {file_dels}, либо взять готовый: --format xml. Ключ --template вам здесь не поможет. Написание своего формата сильно сократит код по сравнению с использованием sax парсера для XML, но я предпочёл взять --format xml.

Собственно, сам код

#!/usr/bin/env python # vim: fileencoding=utf-8  from __future__ import unicode_literals, division from xml import sax from subprocess import check_call, Popen, PIPE from shutil import rmtree from tempfile import mkdtemp   class MercurialRevision(object):     __slots__ = ('rev', 'hex',                  'tags', 'bookmarks', 'branch',                  'parents', 'children',                  'added', 'removed', 'modified',                  'copies',                  'files',)      def __init__(self, rev, hex):         self.rev = rev         self.hex = hex         self.parents = []         self.children = []         self.added = set()         self.removed = set()         self.modified = set()         self.copies = {}         self.tags = set()         self.bookmarks = set()         self.branch = None         self.files = set()      def __str__(self):         return '<revision>'.format(hex=self.hex, rev=self.rev)      def __repr__(self):         return '{0}({rev!r}, {hex!r})'.format(self.__class__.__name__, hex=self.hex, rev=self.rev)      def __hash__(self):         return int(self.hex, 16)   class MercurialHandler(sax.handler.ContentHandler):     def startDocument(self):         self.curpath = []         self.currev = None         nullrev = MercurialRevision(-1, '0' * 40)         self.revisions_rev = {nullrev.rev : nullrev}         self.revisions_hex = {nullrev.hex : nullrev}         self.tags = {}         self.bookmarks = {}         self.characters_fun = None         self.last_data = None      def add_tag(self, tag):         self.currev.tags.add(tag)         self.tags[tag] = self.currev      def add_bookmark(self, bookmark):         self.currev.bookmarks.add(bookmark)         self.bookmarks[bookmark] = self.currev      def characters(self, data):         if self.characters_fun:             if not self.last_data:                 self.last_data = data             else:                 self.last_data += data      def startElement(self, name, attributes):         if name == 'log':             assert not self.curpath             assert not self.currev         elif name == 'logentry':             assert self.curpath == ['log']             assert not self.currev             self.currev = MercurialRevision(int(attributes['revision']), attributes['node'])         else:             assert self.currev             if name == 'tag':                 assert self.curpath[-1] == 'logentry'                 self.characters_fun = self.add_tag             elif name == 'bookmark':                 assert self.curpath[-1] == 'logentry'                 self.characters_fun = self.add_bookmark             elif name == 'parent':                 assert self.curpath[-1] == 'logentry'                 self.currev.parents.append(self.revisions_hex[attributes['node']])             elif name == 'branch':                 assert self.curpath[-1] == 'logentry'                 self.characters_fun = lambda branch: self.currev.__setattr__('branch', branch)             elif name == 'path':                 assert self.curpath[-1] == 'paths'                 if attributes['action'] == 'M':                     self.characters_fun = self.currev.modified.add                 elif attributes['action'] == 'A':                     self.characters_fun = self.currev.added.add                 elif attributes['action'] == 'R':                     self.characters_fun = self.currev.removed.add             elif name == 'copy':                 assert self.curpath[-1] == 'copies'                 self.characters_fun = (lambda destination, source=attributes['source']:                         self.currev.copies.__setitem__(source, destination))         self.curpath.append(name)      def endElement(self, name):         assert self.curpath or self.curpath[-1] == ['log']         assert self.curpath[-1] == name         if name == 'logentry':             if not self.currev.parents:                 self.currev.parents.append(self.revisions_rev[self.currev.rev - 1])             for parent in self.currev.parents:                 parent.children.append(self.currev)             self.revisions_hex[self.currev.hex] = self.currev             self.revisions_rev[self.currev.rev] = self.currev             self.currev = None         if self.last_data is None:             if self.characters_fun:                 self.characters_fun('')         else:             assert self.characters_fun             self.characters_fun(self.last_data)             self.characters_fun = None             self.last_data = None         self.curpath.pop()      def export_result(self):         heads = {revision for revision in self.revisions_hex.values()                  if not revision.children                     or all(child.branch != revision.branch for child in revision.children)}         # heads contains the same revisions as `hg heads --closed`         tips = {head for head in heads if not head.children}         return {             'heads': heads,             'tips': tips,             'tags': self.tags,             'bookmarks': self.bookmarks,             'revisions_hex': self.revisions_hex,             'revisions_rev': self.revisions_rev,             'root': self.revisions_rev[-1],         }  class MercurialRemoteParser(object):     __slots__ = ('parser', 'handler', 'tmpdir')      def __init__(self, tmpdir=None):         self.parser = sax.make_parser()         self.handler = MercurialHandler()         self.parser.setContentHandler(self.handler)         self.tmpdir = tmpdir or mkdtemp(suffix='.hg')         self.init_tmpdir()      def init_tmpdir(self):         check_call(['hg', 'init', self.tmpdir])      def delete_tmpdir(self):         if self.tmpdir and rmtree:             rmtree(self.tmpdir)      __del__ = delete_tmpdir      def __enter__(self):         return self      def __exit__(self, *args, **kwargs):         self.delete_tmpdir()      @staticmethod     def generate_files(parsing_result):         toprocess = [parsing_result['root']]         processed = set()         while toprocess:             revision = toprocess.pop(0)             if revision.parents:                 # Inherit files from the first parent                 assert not revision.files                 if revision.parents[0] not in processed:                     assert toprocess                     toprocess.append(revision)                     continue                 revision.files.update(revision.parents[0].files)                 # Then apply delta found in log                 assert not (revision.files & revision.added)                 revision.files.update(revision.added)                 assert revision.files > revision.removed                 revision.files -= revision.removed                 assert revision.files > revision.modified, (                         'Expected to find the following files: ' + ','.join(                             file for file in revision.modified if not file in revision.files))             processed.add(revision)             toprocess.extend(child for child in revision.children                              if not child in processed and not child in toprocess)         assert set(parsing_result['revisions_rev'].values()) == processed         return parsing_result      def parse_url(self, url, rev_name=None):         p = Popen(['hg', '--repository', self.tmpdir,                          'incoming', '--style', 'xml', '--verbose', url,                   ] + (['--rev', rev_name] if rev_name else []),                   stdout=PIPE)         p.stdout.readline()  # Skip “comparing with {url}” header         self.parser.parse(p.stdout)         parsing_result = self.handler.export_result()         self.generate_files(parsing_result)         return parsing_result   if __name__ == '__main__':     import sys      def print_files(revision):         for file in revision.files:             print file      remote_url = sys.argv[1]     rev_name = sys.argv[2]      with MercurialRemoteParser() as remote_parser:         parsing_result = remote_parser.parse_url(remote_url, rev_name=rev_name)         assert len(parsing_result['tips']) == 1, 'Found more then one head'         print_files(next(iter(parsing_result['tips'])))  # vim: tw=100 ft=python ts=4 sts=4 sw=4 </revision>

Использование: python -O list_hg_files.py https://bitbucket.org/ZyX_I/aurum tip. Оба аргумента (URL удалённого репозитория и обозначение ревизии) обязательны.

ссылка на оригинал статьи http://habrahabr.ru/post/197312/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *