import os import platform import shutil import tempfile import unittest import mozharness.base.errors as errors import mozharness.base.vcs.mercurial as mercurial test_string = '''foo bar baz''' HG = ['hg'] + mercurial.HG_OPTIONS # Known default .hgrc os.environ['HGRCPATH'] = os.path.abspath(os.path.join(os.path.dirname(__file__), 'helper_files', '.hgrc')) def cleanup(): if os.path.exists('test_logs'): shutil.rmtree('test_logs') if os.path.exists('test_dir'): if os.path.isdir('test_dir'): shutil.rmtree('test_dir') else: os.remove('test_dir') for filename in ('localconfig.json', 'localconfig.json.bak'): if os.path.exists(filename): os.remove(filename) def get_mercurial_vcs_obj(): m = mercurial.MercurialVCS() m.config = {} return m def get_revisions(dest): m = get_mercurial_vcs_obj() retval = [] for rev in m.get_output_from_command(HG + ['log', '-R', dest, '--template', '{node}\n']).split('\n'): rev = rev.strip() if not rev: continue retval.append(rev) return retval class TestMakeAbsolute(unittest.TestCase): # _make_absolute() doesn't play nicely with windows/msys paths. # TODO: fix _make_absolute, write it out of the picture, or determine # that it's not needed on windows. if platform.system() not in ("Windows",): def test_absolute_path(self): m = get_mercurial_vcs_obj() self.assertEquals(m._make_absolute("/foo/bar"), "/foo/bar") def test_relative_path(self): m = get_mercurial_vcs_obj() self.assertEquals(m._make_absolute("foo/bar"), os.path.abspath("foo/bar")) def test_HTTP_paths(self): m = get_mercurial_vcs_obj() self.assertEquals(m._make_absolute("http://foo/bar"), "http://foo/bar") def test_absolute_file_path(self): m = get_mercurial_vcs_obj() self.assertEquals(m._make_absolute("file:///foo/bar"), "file:///foo/bar") def test_relative_file_path(self): m = get_mercurial_vcs_obj() self.assertEquals(m._make_absolute("file://foo/bar"), "file://%s/foo/bar" % os.getcwd()) class TestHg(unittest.TestCase): def _init_hg_repo(self, hg_obj, repodir): hg_obj.run_command(["bash", os.path.join(os.path.dirname(__file__), "helper_files", "init_hgrepo.sh"), repodir]) def setUp(self): self.tmpdir = tempfile.mkdtemp() self.repodir = os.path.join(self.tmpdir, 'repo') m = get_mercurial_vcs_obj() self._init_hg_repo(m, self.repodir) self.revisions = get_revisions(self.repodir) self.wc = os.path.join(self.tmpdir, 'wc') self.pwd = os.getcwd() def tearDown(self): shutil.rmtree(self.tmpdir) os.chdir(self.pwd) def test_get_branch(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) b = m.get_branch_from_path(self.wc) self.assertEquals(b, 'default') def test_get_branches(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) branches = m.get_branches_from_path(self.wc) self.assertEquals(sorted(branches), sorted(["branch2", "default"])) def test_clone(self): m = get_mercurial_vcs_obj() rev = m.clone(self.repodir, self.wc, update_dest=False) self.assertEquals(rev, None) self.assertEquals(self.revisions, get_revisions(self.wc)) self.assertEquals(sorted(os.listdir(self.wc)), ['.hg']) def test_clone_into_non_empty_dir(self): m = get_mercurial_vcs_obj() m.mkdir_p(self.wc) open(os.path.join(self.wc, 'test.txt'), 'w').write('hello') m.clone(self.repodir, self.wc, update_dest=False) self.failUnless(not os.path.exists(os.path.join(self.wc, 'test.txt'))) def test_clone_update(self): m = get_mercurial_vcs_obj() rev = m.clone(self.repodir, self.wc, update_dest=True) self.assertEquals(rev, self.revisions[0]) def test_clone_branch(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc, branch='branch2', update_dest=False) # On hg 1.6, we should only have a subset of the revisions if m.hg_ver() >= (1, 6, 0): self.assertEquals(self.revisions[1:], get_revisions(self.wc)) else: self.assertEquals(self.revisions, get_revisions(self.wc)) def test_clone_update_branch(self): m = get_mercurial_vcs_obj() rev = m.clone(self.repodir, os.path.join(self.tmpdir, 'wc'), branch="branch2", update_dest=True) self.assertEquals(rev, self.revisions[1], self.revisions) def test_clone_revision(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc, revision=self.revisions[0], update_dest=False) # We'll only get a subset of the revisions self.assertEquals(self.revisions[:1] + self.revisions[2:], get_revisions(self.wc)) def test_update_revision(self): m = get_mercurial_vcs_obj() rev = m.clone(self.repodir, self.wc, update_dest=False) self.assertEquals(rev, None) rev = m.update(self.wc, revision=self.revisions[1]) self.assertEquals(rev, self.revisions[1]) def test_pull(self): m = get_mercurial_vcs_obj() # Clone just the first rev m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False) self.assertEquals(get_revisions(self.wc), self.revisions[-1:]) # Now pull in new changes rev = m.pull(self.repodir, self.wc, update_dest=False) self.assertEquals(rev, None) self.assertEquals(get_revisions(self.wc), self.revisions) def test_pull_revision(self): m = get_mercurial_vcs_obj() # Clone just the first rev m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False) self.assertEquals(get_revisions(self.wc), self.revisions[-1:]) # Now pull in just the last revision rev = m.pull(self.repodir, self.wc, revision=self.revisions[0], update_dest=False) self.assertEquals(rev, None) # We'll be missing the middle revision (on another branch) self.assertEquals(get_revisions(self.wc), self.revisions[:1] + self.revisions[2:]) def test_pull_branch(self): m = get_mercurial_vcs_obj() # Clone just the first rev m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False) self.assertEquals(get_revisions(self.wc), self.revisions[-1:]) # Now pull in the other branch rev = m.pull(self.repodir, self.wc, branch="branch2", update_dest=False) self.assertEquals(rev, None) # On hg 1.6, we'll be missing the last revision (on another branch) if m.hg_ver() >= (1, 6, 0): self.assertEquals(get_revisions(self.wc), self.revisions[1:]) else: self.assertEquals(get_revisions(self.wc), self.revisions) def test_pull_unrelated(self): m = get_mercurial_vcs_obj() # Create a new repo repo2 = os.path.join(self.tmpdir, 'repo2') self._init_hg_repo(m, repo2) self.assertNotEqual(self.revisions, get_revisions(repo2)) # Clone the original repo m.clone(self.repodir, self.wc, update_dest=False) # Hide the wanted error m.config = {'log_to_console': False} # Try and pull in changes from the new repo self.assertRaises(mercurial.VCSException, m.pull, repo2, self.wc, update_dest=False) def test_push(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc, revision=self.revisions[-2]) m.push(src=self.repodir, remote=self.wc) self.assertEquals(get_revisions(self.wc), self.revisions) def test_push_with_branch(self): m = get_mercurial_vcs_obj() if m.hg_ver() >= (1, 6, 0): m.clone(self.repodir, self.wc, revision=self.revisions[-1]) m.push(src=self.repodir, remote=self.wc, branch='branch2') m.push(src=self.repodir, remote=self.wc, branch='default') self.assertEquals(get_revisions(self.wc), self.revisions) def test_push_with_revision(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc, revision=self.revisions[-2]) m.push(src=self.repodir, remote=self.wc, revision=self.revisions[-1]) self.assertEquals(get_revisions(self.wc), self.revisions[-2:]) def test_mercurial(self): m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': self.repodir, 'dest': self.wc, 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } m.ensure_repo_and_revision() rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[0]) def test_push_new_branches_not_allowed(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc, revision=self.revisions[0]) # Hide the wanted error m.config = {'log_to_console': False} self.assertRaises(Exception, m.push, self.repodir, self.wc, push_new_branches=False) def test_mercurial_relative_dir(self): m = get_mercurial_vcs_obj() repo = os.path.basename(self.repodir) wc = os.path.basename(self.wc) m.vcs_config = { 'repo': repo, 'dest': wc, 'revision': self.revisions[-1], 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } m.chdir(os.path.dirname(self.repodir)) try: rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[-1]) m.info("Creating test.txt") open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!") m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': repo, 'dest': wc, 'revision': self.revisions[0], 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[0]) # Make sure our local file didn't go away self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt'))) finally: m.chdir(self.pwd) def test_mercurial_update_tip(self): m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': self.repodir, 'dest': self.wc, 'revision': self.revisions[-1], 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[-1]) open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!") m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': self.repodir, 'dest': self.wc, 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[0]) # Make sure our local file didn't go away self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt'))) def test_mercurial_update_rev(self): m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': self.repodir, 'dest': self.wc, 'revision': self.revisions[-1], 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[-1]) open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!") m = get_mercurial_vcs_obj() m.vcs_config = { 'repo': self.repodir, 'dest': self.wc, 'revision': self.revisions[0], 'vcs_share_base': os.path.join(self.tmpdir, 'share'), } rev = m.ensure_repo_and_revision() self.assertEquals(rev, self.revisions[0]) # Make sure our local file didn't go away self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt'))) def test_make_hg_url(self): #construct an hg url specific to revision, branch and filename and try to pull it down file_url = mercurial.make_hg_url( "hg.mozilla.org", '//build/tools/', revision='FIREFOX_3_6_12_RELEASE', filename="/lib/python/util/hg.py", protocol='https', ) expected_url = "https://hg.mozilla.org/build/tools/raw-file/FIREFOX_3_6_12_RELEASE/lib/python/util/hg.py" self.assertEquals(file_url, expected_url) def test_make_hg_url_no_filename(self): file_url = mercurial.make_hg_url( "hg.mozilla.org", "/build/tools", revision="default", protocol='https', ) expected_url = "https://hg.mozilla.org/build/tools/rev/default" self.assertEquals(file_url, expected_url) def test_make_hg_url_no_revision_no_filename(self): repo_url = mercurial.make_hg_url( "hg.mozilla.org", "/build/tools", protocol='https', ) expected_url = "https://hg.mozilla.org/build/tools" self.assertEquals(repo_url, expected_url) def test_make_hg_url_different_protocol(self): repo_url = mercurial.make_hg_url( "hg.mozilla.org", "/build/tools", protocol='ssh', ) expected_url = "ssh://hg.mozilla.org/build/tools" self.assertEquals(repo_url, expected_url) def test_apply_and_push(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) def c(repo, attempt): m.run_command(HG + ['tag', '-f', 'TEST'], cwd=repo) m.apply_and_push(self.wc, self.repodir, c) self.assertEquals(get_revisions(self.wc), get_revisions(self.repodir)) def test_apply_and_push_fail(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) def c(repo, attempt, remote): m.run_command(HG + ['tag', '-f', 'TEST'], cwd=repo) m.run_command(HG + ['tag', '-f', 'CONFLICTING_TAG'], cwd=remote) m.config = {'log_to_console': False} self.assertRaises(errors.VCSException, m.apply_and_push, self.wc, self.repodir, lambda r, a: c(r, a, self.repodir), max_attempts=2) def test_apply_and_push_with_rebase(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) m.config = {'log_to_console': False} def c(repo, attempt, remote): m.run_command(HG + ['tag', '-f', 'TEST'], cwd=repo) if attempt == 1: m.run_command(HG + ['rm', 'hello.txt'], cwd=remote) m.run_command(HG + ['commit', '-m', 'test'], cwd=remote) m.apply_and_push(self.wc, self.repodir, lambda r, a: c(r, a, self.repodir), max_attempts=2) self.assertEquals(get_revisions(self.wc), get_revisions(self.repodir)) def test_apply_and_push_rebase_fails(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) m.config = {'log_to_console': False} def c(repo, attempt, remote): m.run_command(HG + ['tag', '-f', 'TEST'], cwd=repo) if attempt in (1, 2): m.run_command(HG + ['tag', '-f', 'CONFLICTING_TAG'], cwd=remote) m.apply_and_push(self.wc, self.repodir, lambda r, a: c(r, a, self.repodir), max_attempts=4) self.assertEquals(get_revisions(self.wc), get_revisions(self.repodir)) def test_apply_and_push_on_branch(self): m = get_mercurial_vcs_obj() if m.hg_ver() >= (1, 6, 0): m.clone(self.repodir, self.wc) def c(repo, attempt): m.run_command(HG + ['branch', 'branch3'], cwd=repo) m.run_command(HG + ['tag', '-f', 'TEST'], cwd=repo) m.apply_and_push(self.wc, self.repodir, c) self.assertEquals(get_revisions(self.wc), get_revisions(self.repodir)) def test_apply_and_push_with_no_change(self): m = get_mercurial_vcs_obj() m.clone(self.repodir, self.wc) def c(r, a): pass self.assertRaises(errors.VCSException, m.apply_and_push, self.wc, self.repodir, c) if __name__ == '__main__': unittest.main()