Как‐то понадобился мне просмотр списка файлов в удалённом репозитории. Клонировать его при этом как‐то не очень хотелось. Поиск в интернете ожидаемо дал множество ответов вида «это невозможно, делайте клон». А мне‐то надо всего‐навсего убедиться, что по некоторой ссылке находится репозиторий, соответствующий некоторому архиву с исходными кодами. Так как «некоторая ссылка» находится на странице с описанием содержимого этого архива (точнее, дополнения в этом архиве), то мне показалось достаточным сравнить только список файлов. Как быть?
Конечно, Mercurial не предоставляет практически никаких возможностей работы с удалённым репозиторием. Точнее, можно сделать push и pull (ну и clone как частный случай последнего). Но можно ли сделать pull, не затрагивая при этом файловую систему? Ответ: можно, здесь нам поможет
Конечно, Mercurial не предоставляет практически никаких возможностей работы с удалённым репозиторием. Точнее, можно сделать push и pull (ну и clone как частный случай последнего). Но можно ли сделать pull, не затрагивая при этом файловую систему? Ответ: можно, здесь нам поможет
hg incoming
. Собственно, алгоритм работы такой:- Создать где‐то новый пустой репозиторий. В пустой репозиторий можно делать
pull
из любого репозитория. - Используя
hg incoming
получить список изменений. Так какhg incoming
использует те же функции, что иhg log
, то мы не ограничены в возможностях изменения его вывода. В частности, можно получить список всех файлов, изменённых в каждой из ревизий, или даже сами изменения в форматеunified diff
(с расширениямиgit
для бинарных файлов). Diff нам не нужен, а вот список всех изменённых файлов — пригодится. - Так как мы получаем все ревизии, то по ходу дела можно в дополнение к списку родилей к каждому изменению присоединить и список детей. Отсутствие детей, которые не являются предками ревизии, список файлов в которой нас интересует, нас не волнует.
- У mercurial есть одна ревизия, которая обязательно присутствует в любом репозитории и при том является единственной, реально не имеющей ни одного родителя:
-1:0000000000000000000000000000000000000000
. Это хорошая начальная точка.
Начиная с данной ревизии найдём список файлов во всех остальных ревизиях (список файлов в начальной ревизии известен: он пуст). Для этого
- Для каждой ревизии, кроме начальной, возьмём список файлов из первого родителя. Ревизии обходятся от родителей к детям.
- Добавим в этот список список добавленных файлов (его вы получите, если используете
hg incoming --format xml --verbose
: в тёгеpaths
). - Удалим из этого списка список удалённых файлов (получается там же).
- Теперь найдём ревизию, не имеющую ни одного потомка. Это и будет ревизия, запрошенная с помощью
hg incoming --rev revspec
. Найдя эту ревизию, выведем список файлов в ней.
Замечу, что вывод hg incoming
с форматом по‐умолчанию невозможно использовать для такой операции. Надо либо писать свой шаблон с {file_adds}
, {file_mods}
и {file_dels}
, либо взять готовый: --format xml
. Ключ --template
вам здесь не поможет. Написание своего формата сильно сократит код по сравнению с использованием sax парсера для XML, но я предпочёл взять --format xml
.
Собственно, сам код
#!/usr/bin/env python # vim: fileencoding=utf-8 from __future__ import unicode_literals, division from xml import sax from subprocess import check_call, Popen, PIPE from shutil import rmtree from tempfile import mkdtemp class MercurialRevision(object): __slots__ = ('rev', 'hex', 'tags', 'bookmarks', 'branch', 'parents', 'children', 'added', 'removed', 'modified', 'copies', 'files',) def __init__(self, rev, hex): self.rev = rev self.hex = hex self.parents = [] self.children = [] self.added = set() self.removed = set() self.modified = set() self.copies = {} self.tags = set() self.bookmarks = set() self.branch = None self.files = set() def __str__(self): return '<revision>'.format(hex=self.hex, rev=self.rev) def __repr__(self): return '{0}({rev!r}, {hex!r})'.format(self.__class__.__name__, hex=self.hex, rev=self.rev) def __hash__(self): return int(self.hex, 16) class MercurialHandler(sax.handler.ContentHandler): def startDocument(self): self.curpath = [] self.currev = None nullrev = MercurialRevision(-1, '0' * 40) self.revisions_rev = {nullrev.rev : nullrev} self.revisions_hex = {nullrev.hex : nullrev} self.tags = {} self.bookmarks = {} self.characters_fun = None self.last_data = None def add_tag(self, tag): self.currev.tags.add(tag) self.tags[tag] = self.currev def add_bookmark(self, bookmark): self.currev.bookmarks.add(bookmark) self.bookmarks[bookmark] = self.currev def characters(self, data): if self.characters_fun: if not self.last_data: self.last_data = data else: self.last_data += data def startElement(self, name, attributes): if name == 'log': assert not self.curpath assert not self.currev elif name == 'logentry': assert self.curpath == ['log'] assert not self.currev self.currev = MercurialRevision(int(attributes['revision']), attributes['node']) else: assert self.currev if name == 'tag': assert self.curpath[-1] == 'logentry' self.characters_fun = self.add_tag elif name == 'bookmark': assert self.curpath[-1] == 'logentry' self.characters_fun = self.add_bookmark elif name == 'parent': assert self.curpath[-1] == 'logentry' self.currev.parents.append(self.revisions_hex[attributes['node']]) elif name == 'branch': assert self.curpath[-1] == 'logentry' self.characters_fun = lambda branch: self.currev.__setattr__('branch', branch) elif name == 'path': assert self.curpath[-1] == 'paths' if attributes['action'] == 'M': self.characters_fun = self.currev.modified.add elif attributes['action'] == 'A': self.characters_fun = self.currev.added.add elif attributes['action'] == 'R': self.characters_fun = self.currev.removed.add elif name == 'copy': assert self.curpath[-1] == 'copies' self.characters_fun = (lambda destination, source=attributes['source']: self.currev.copies.__setitem__(source, destination)) self.curpath.append(name) def endElement(self, name): assert self.curpath or self.curpath[-1] == ['log'] assert self.curpath[-1] == name if name == 'logentry': if not self.currev.parents: self.currev.parents.append(self.revisions_rev[self.currev.rev - 1]) for parent in self.currev.parents: parent.children.append(self.currev) self.revisions_hex[self.currev.hex] = self.currev self.revisions_rev[self.currev.rev] = self.currev self.currev = None if self.last_data is None: if self.characters_fun: self.characters_fun('') else: assert self.characters_fun self.characters_fun(self.last_data) self.characters_fun = None self.last_data = None self.curpath.pop() def export_result(self): heads = {revision for revision in self.revisions_hex.values() if not revision.children or all(child.branch != revision.branch for child in revision.children)} # heads contains the same revisions as `hg heads --closed` tips = {head for head in heads if not head.children} return { 'heads': heads, 'tips': tips, 'tags': self.tags, 'bookmarks': self.bookmarks, 'revisions_hex': self.revisions_hex, 'revisions_rev': self.revisions_rev, 'root': self.revisions_rev[-1], } class MercurialRemoteParser(object): __slots__ = ('parser', 'handler', 'tmpdir') def __init__(self, tmpdir=None): self.parser = sax.make_parser() self.handler = MercurialHandler() self.parser.setContentHandler(self.handler) self.tmpdir = tmpdir or mkdtemp(suffix='.hg') self.init_tmpdir() def init_tmpdir(self): check_call(['hg', 'init', self.tmpdir]) def delete_tmpdir(self): if self.tmpdir and rmtree: rmtree(self.tmpdir) __del__ = delete_tmpdir def __enter__(self): return self def __exit__(self, *args, **kwargs): self.delete_tmpdir() @staticmethod def generate_files(parsing_result): toprocess = [parsing_result['root']] processed = set() while toprocess: revision = toprocess.pop(0) if revision.parents: # Inherit files from the first parent assert not revision.files if revision.parents[0] not in processed: assert toprocess toprocess.append(revision) continue revision.files.update(revision.parents[0].files) # Then apply delta found in log assert not (revision.files & revision.added) revision.files.update(revision.added) assert revision.files > revision.removed revision.files -= revision.removed assert revision.files > revision.modified, ( 'Expected to find the following files: ' + ','.join( file for file in revision.modified if not file in revision.files)) processed.add(revision) toprocess.extend(child for child in revision.children if not child in processed and not child in toprocess) assert set(parsing_result['revisions_rev'].values()) == processed return parsing_result def parse_url(self, url, rev_name=None): p = Popen(['hg', '--repository', self.tmpdir, 'incoming', '--style', 'xml', '--verbose', url, ] + (['--rev', rev_name] if rev_name else []), stdout=PIPE) p.stdout.readline() # Skip “comparing with {url}” header self.parser.parse(p.stdout) parsing_result = self.handler.export_result() self.generate_files(parsing_result) return parsing_result if __name__ == '__main__': import sys def print_files(revision): for file in revision.files: print file remote_url = sys.argv[1] rev_name = sys.argv[2] with MercurialRemoteParser() as remote_parser: parsing_result = remote_parser.parse_url(remote_url, rev_name=rev_name) assert len(parsing_result['tips']) == 1, 'Found more then one head' print_files(next(iter(parsing_result['tips']))) # vim: tw=100 ft=python ts=4 sts=4 sw=4 </revision>
Использование: python -O list_hg_files.py https://bitbucket.org/ZyX_I/aurum tip
. Оба аргумента (URL удалённого репозитория и обозначение ревизии) обязательны.
ссылка на оригинал статьи http://habrahabr.ru/post/197312/
Добавить комментарий