Add basic tests of backend objects

This commit is contained in:
Conor Hughes 2014-11-19 17:46:17 -08:00
parent 9e838fb2e7
commit 515a430a96
10 changed files with 225 additions and 7 deletions

View File

@ -99,7 +99,7 @@ class Application(object):
backups = self.get_due_backups()
backup_successes = []
for backup in backups:
if backup.perform():
if backup.perform(datetime.datetime.now(dateutil.tz.tzlocal())):
backup_successes.append(backup)
self.note_successful_backups(backup_successes)
self.logger.info("Successfully completed {}/{} backups.".format(len(backup_successes), len(backups)))

View File

@ -17,6 +17,9 @@ def register_backend_type(type, name):
raise Exception("Backend name collision: {}".format(name))
_BACKEND_TYPES[name] = type
def unregister_backend_type(name):
del _BACKEND_TYPES[name]
def backend_type(name):
return _BACKEND_TYPES.get(name, None)

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import datetime
import itertools
@ -73,10 +74,10 @@ class Backup(object):
due = next_due_run(self.timespec, last_run)
return due < now
def perform(self):
def perform(self, now):
success = True
for backend in self.backends:
success = success and backend.perform(self.paths, self.name)
success = success and backend.perform(self.paths, self.name, now)
return success
def get_all_archives(self, backends=None):

View File

@ -66,12 +66,14 @@ class TarsnapBackend(backend_types.BackupBackend):
ctx.update(backup_name.decode("utf-8"))
return ctx.hexdigest()
def create_backup_instance_name(self, backup_name):
def create_backup_instance_name(self, backup_name, timestamp):
unixtime = time.mktime(timestamp.timetuple())
return "{}-{}-{}".format(self.create_backup_identifier(backup_name),
time.time(), backup_name)
unixtime, backup_name)
def perform(self, paths, backup_name):
backup_instance_name = self.create_backup_instance_name(backup_name)
def perform(self, paths, backup_name, now_timestamp):
backup_instance_name = self.create_backup_instance_name(backup_name,
now_timestamp)
self.logger.info("Creating backup \"{}\": {}"
.format(backup_instance_name, ", ".join(paths)))
tmpdir = tempfile.mkdtemp()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import os
import sys

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
""" Tests go here """

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import os
import sys
import inspect
import unittest
def get_suite():
this_file = os.path.abspath(inspect.getsourcefile(lambda: None))
tests_dir = os.path.dirname(this_file)
package_dir = os.path.join(tests_dir, "..")
container_dir = os.path.join(package_dir, "..")
loader = unittest.defaultTestLoader
suite = loader.discover(package_dir, top_level_dir=container_dir)
return suite
def main():
try:
import mock
except ImportError:
sys.stderr.write("Need the mock library to run backupmgr tests.\n")
return 1
if unittest.TextTestRunner().run(get_suite()).wasSuccessful():
return 0
else:
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,38 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import unittest
import datetime
import dateutil
from .. import backend_types
from ..backup_backends import tarsnap
class TestBackendsRegistered(unittest.TestCase):
def test_types_registered(self):
self.assertIs(backend_types.backend_type("tarsnap"), tarsnap.TarsnapBackend)
class TestUnregistration(unittest.TestCase):
def test_unregistration_works(self):
class MyNewBackend(backend_types.BackupBackend):
NAMES = ("mytype",)
self.assertIs(backend_types.backend_type("mytype"), MyNewBackend)
backend_types.unregister_backend_type("mytype")
self.assertIsNone(backend_types.backend_type("mytype"))
class TestBackupBackendName(unittest.TestCase):
def test_str_correctness(self):
class MyBackend(backend_types.BackupBackend):
NAMES = ("mytype",)
self.assertEquals(str(MyBackend({"name":"foo"})), "MyBackend: foo")
backend_types.unregister_backend_type("mytype")
class TestArchiveBasics(unittest.TestCase):
def test_archive_datetime_property(self):
arch = backend_types.Archive()
ts = 1416279400
arch.timestamp = ts
dt = datetime.datetime.utcfromtimestamp(ts).replace(tzinfo=dateutil.tz.tzutc())
self.assertEqual(arch.datetime, dt)

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import unittest
import datetime
import subprocess
import StringIO
import os
import mock
import dateutil
from ..backup_backends import tarsnap
from .. import backend_types
class TarasnapBackendClassTests(unittest.TestCase):
def test_is_registered(self):
self.assertIs(backend_types.backend_type("tarsnap"),
tarsnap.TarsnapBackend)
class TarsnapBackendTests(unittest.TestCase):
def setUp(self):
self.backend = tarsnap.TarsnapBackend({"keyfile": "/root/theKey.key",
"name": "test backend",
"host": "thishost"})
self.ts = datetime.datetime.fromtimestamp(1416279400,
dateutil.tz.tzlocal())
def test_dunder_str(self):
expected = "TarsnapBackend: test backend (thishost with /root/theKey.key)"
self.assertEquals(expected, str(self.backend))
def test_backup_instance_name(self):
name = self.backend.create_backup_instance_name("great", self.ts)
self.assertEquals('17b690276b0184062d03b56fbf0d66b775c2a19c-1416279400.0-great',
name)
def test_perform(self):
instance_mock = mock.NonCallableMock()
instance_mock.stdout = StringIO.StringIO("do not parse\nthis content")
with mock.patch("subprocess.Popen",
return_value=instance_mock) as mock_popen:
def do_wait():
# take this opportunity to check the directory and make sure
# the links are what we expect
path = mock_popen.call_args[0][0][2]
self.assertEquals(set(os.listdir(path)),
set(["one", "two", "three"]))
self.assertEquals(os.readlink(os.path.join(path, "one")), "/uno")
self.assertEquals(os.readlink(os.path.join(path, "two")), "/dos")
self.assertEquals(os.readlink(os.path.join(path, "three")), "/tres")
do_wait.tmpdir = path
return 0
do_wait.tmpdir = None
instance_mock.wait = do_wait
self.backend.perform({"/uno":"one", "/dos":"two", "/tres":"three"},
"mrgl", self.ts)
self.assertEquals(mock_popen.call_count, 1)
self.assertTrue(do_wait.tmpdir) # make sure we ran this bit
self.assertEquals(len(mock_popen.call_args[0]), 1)
self.assertEquals(mock_popen.call_args[0][0][:2], ["/usr/local/bin/tarsnap",
"-C"])
self.assertEquals(mock_popen.call_args[0][0][3:],
["-H", "-cf",
"712fded485ebd593f5954e38acb78ea437c15997-1416279400.0-mrgl",
"--keyfile", "/root/theKey.key", "one", "two", "three"])
self.assertEquals(mock_popen.call_args[1]["stderr"], subprocess.STDOUT)
self.assertEquals(mock_popen.call_args[1]["stdout"], subprocess.PIPE)
self.assertFalse(os.path.exists(do_wait.tmpdir)) # clean up your turds
def test_perform_exit_status(self):
instance_mock = mock.NonCallableMock()
instance_mock.stdout = StringIO.StringIO("boring stuff")
with mock.patch("subprocess.Popen",
return_value=instance_mock) as mock_popen:
instance_mock.wait = lambda: 0
self.assertTrue(self.backend.perform({"/foo" : "bar"}, "mrgl", self.ts))
instance_mock.wait = lambda: 1
self.assertFalse(self.backend.perform({"/foo" : "bar"}, "mrgl", self.ts))
def test_archive_listing_calls_correctly(self):
instance_mock = mock.NonCallableMagicMock()
with mock.patch("subprocess.Popen",
return_value=instance_mock) as mock_popen:
self.backend.existing_archives_for_name("nomatter")
mock_popen.assert_called_once_with(
["/usr/local/bin/tarsnap", "--list-archives", "--keyfile",
"/root/theKey.key"],
stdout=subprocess.PIPE)
def test_archive_listing_parses_correctly_basics(self):
instance_mock = mock.NonCallableMagicMock()
lines = [
"712fded485ebd593f5954e38acb78ea437c15997-1416279400.0-mrgl",
"712fded485ebd593f5954e38acb78ea437c1599f-1416280000.0-brgl",
"712fded485ebd593f5954e38acb78ea437c15997-1416369139.0-mrgl"
]
instance_mock.stdout = StringIO.StringIO("\n".join(lines))
instance_mock.wait = lambda: 0
with mock.patch("subprocess.Popen",
return_value=instance_mock) as mock_popen:
results = self.backend.existing_archives_for_name("mrgl")
self.assertEqual(len(results), 2)
for result in results:
self.assertIs(result.backend, self.backend)
for archive, fullname in zip(results, [line for line in lines if "mrgl" in line]):
self.assertEquals(archive.fullname, fullname)
for archive, time in zip(results, [1416279400, 1416369139]):
self.assertEquals(archive.timestamp, time)
class TestTarsnapArchive(unittest.TestCase):
def setUp(self):
self.backend = tarsnap.TarsnapBackend({"keyfile": "/root/theKey.key",
"name": "test backend",
"host": "thishost"})
self.archive = tarsnap.TarsnapArchive(
self.backend,
1416279400.0,
"712fded485ebd593f5954e38acb78ea437c15997-1416279400.0-mrgl")
def test_restore_invokes_tarsnap_correctly(self):
instance_mock = mock.NonCallableMagicMock()
instance_mock.wait = lambda: 0
with mock.patch("subprocess.Popen",
return_value=instance_mock) as mock_popen:
self.archive.restore("/tmp/nothing")
mock_popen.assert_called_once_with(
["/usr/local/bin/tarsnap", "-C", "/tmp/nothing", "-x", "-f",
"712fded485ebd593f5954e38acb78ea437c15997-1416279400.0-mrgl"],
stderr=subprocess.STDOUT, stdout=subprocess.PIPE)

4
tests.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
cd `dirname $0`
exec python backupmgr/test