diff options
| author | Chad Smith <chad.smith@canonical.com> | 2017-09-18 21:21:40 (GMT) |
|---|---|---|
| committer | Chad Smith <chad.smith@canonical.com> | 2017-09-18 21:21:40 (GMT) |
| commit | a3a33b43c3c62b82c30050a78c69a11a5eca8c40 (patch) | |
| tree | 605749654c384a10321b4c6e584c9464fe21fec4 | |
| parent | 356586cb4e1adb2a4f12e087f45415e0cacff163 (diff) | |
add needs_exe param to temp_utils functions. Sprinkle some unit test goodness around temp_utils. Make callsites which need exe perms provide needs_exe
| -rw-r--r-- | cloudinit/net/dhcp.py | 2 | ||||
| -rw-r--r-- | cloudinit/temp_utils.py | 22 | ||||
| -rw-r--r-- | cloudinit/tests/test_temp_utils.py | 101 | ||||
| -rw-r--r-- | cloudinit/util.py | 2 |
4 files changed, 118 insertions, 9 deletions
diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py index 60eb078..0535063 100644 --- a/cloudinit/net/dhcp.py +++ b/cloudinit/net/dhcp.py @@ -48,7 +48,7 @@ def maybe_perform_dhcp_discovery(nic=None): if not dhclient_path: LOG.debug('Skip dhclient configuration: No dhclient command found.') return {} - with temp_utils.tempdir(dir='/var/tmp', prefix='cloud-init-dhcp-') as tdir: + with temp_utils.tempdir(prefix='cloud-init-dhcp-', needs_exe=True) as tdir: # Use /var/tmp because /run/cloud-init/tmp is mounted noexec return dhcp_discovery(dhclient_path, nic, tdir) diff --git a/cloudinit/temp_utils.py b/cloudinit/temp_utils.py index 0355f19..5d7adf7 100644 --- a/cloudinit/temp_utils.py +++ b/cloudinit/temp_utils.py @@ -8,9 +8,10 @@ import tempfile _TMPDIR = None _ROOT_TMPDIR = "/run/cloud-init/tmp" +_EXE_ROOT_TMPDIR = "/var/tmp/cloud-init" -def _tempfile_dir_arg(odir=None): +def _tempfile_dir_arg(odir=None, needs_exe=False): """Return the proper 'dir' argument for tempfile functions. When root, cloud-init will use /run/cloud-init/tmp to avoid @@ -20,8 +21,10 @@ def _tempfile_dir_arg(odir=None): If the caller of this function (mkdtemp or mkstemp) was provided with a 'dir' argument, then that is respected. - @param odir: original 'dir' arg to 'mkdtemp' or other.""" - + @param odir: original 'dir' arg to 'mkdtemp' or other. + @param needs_exe: Boolean specifying whether or not exe permissions are + needed for tempdir. This is needed because /run is mounted noexec. + """ if odir is not None: return odir @@ -29,7 +32,9 @@ def _tempfile_dir_arg(odir=None): if _TMPDIR: return _TMPDIR - if os.getuid() == 0: + if needs_exe: + tdir = _EXE_ROOT_TMPDIR + elif os.getuid() == 0: tdir = _ROOT_TMPDIR else: tdir = os.environ.get('TMPDIR', '/tmp') @@ -42,7 +47,8 @@ def _tempfile_dir_arg(odir=None): def ExtendedTemporaryFile(**kwargs): - kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None)) + kwargs['dir'] = _tempfile_dir_arg( + kwargs.pop('dir', None), kwargs.pop('needs_exe', False)) fh = tempfile.NamedTemporaryFile(**kwargs) # Replace its unlink with a quiet version # that does not raise errors when the @@ -82,12 +88,14 @@ def tempdir(**kwargs): def mkdtemp(**kwargs): - kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None)) + kwargs['dir'] = _tempfile_dir_arg( + kwargs.pop('dir', None), kwargs.pop('needs_exe', False)) return tempfile.mkdtemp(**kwargs) def mkstemp(**kwargs): - kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None)) + kwargs['dir'] = _tempfile_dir_arg( + kwargs.pop('dir', None), kwargs.pop('needs_exe', False)) return tempfile.mkstemp(**kwargs) # vi: ts=4 expandtab diff --git a/cloudinit/tests/test_temp_utils.py b/cloudinit/tests/test_temp_utils.py new file mode 100644 index 0000000..ffbb92c --- /dev/null +++ b/cloudinit/tests/test_temp_utils.py @@ -0,0 +1,101 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloudinit.temp_utils""" + +from cloudinit.temp_utils import mkdtemp, mkstemp +from cloudinit.tests.helpers import CiTestCase, wrap_and_call + + +class TestTempUtils(CiTestCase): + + def test_mkdtemp_default_non_root(self): + """mkdtemp creates a dir under /tmp for the unprivileged.""" + calls = [] + + def fake_mkdtemp(*args, **kwargs): + calls.append(kwargs) + return '/fake/return/path' + + retval = wrap_and_call( + 'cloudinit.temp_utils', + {'os.getuid': 1000, + 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, + '_TMPDIR': {'new': None}, + 'os.path.isdir': True}, + mkdtemp) + self.assertEqual('/fake/return/path', retval) + self.assertEqual([{'dir': '/tmp'}], calls) + + def test_mkdtemp_default_non_root_needs_exe(self): + """mkdtemp creates a dir under /var/tmp/cloud-init when needs_exe.""" + calls = [] + + def fake_mkdtemp(*args, **kwargs): + calls.append(kwargs) + return '/fake/return/path' + + retval = wrap_and_call( + 'cloudinit.temp_utils', + {'os.getuid': 1000, + 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, + '_TMPDIR': {'new': None}, + 'os.path.isdir': True}, + mkdtemp, needs_exe=True) + self.assertEqual('/fake/return/path', retval) + self.assertEqual([{'dir': '/var/tmp/cloud-init'}], calls) + + def test_mkdtemp_default_root(self): + """mkdtemp creates a dir under /run/cloud-init for the privileged.""" + calls = [] + + def fake_mkdtemp(*args, **kwargs): + calls.append(kwargs) + return '/fake/return/path' + + retval = wrap_and_call( + 'cloudinit.temp_utils', + {'os.getuid': 0, + 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, + '_TMPDIR': {'new': None}, + 'os.path.isdir': True}, + mkdtemp) + self.assertEqual('/fake/return/path', retval) + self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls) + + def test_mkstemp_default_non_root(self): + """mkstemp creates secure tempfile under /tmp for the unprivileged.""" + calls = [] + + def fake_mkstemp(*args, **kwargs): + calls.append(kwargs) + return '/fake/return/path' + + retval = wrap_and_call( + 'cloudinit.temp_utils', + {'os.getuid': 1000, + 'tempfile.mkstemp': {'side_effect': fake_mkstemp}, + '_TMPDIR': {'new': None}, + 'os.path.isdir': True}, + mkstemp) + self.assertEqual('/fake/return/path', retval) + self.assertEqual([{'dir': '/tmp'}], calls) + + def test_mkstemp_default_root(self): + """mkstemp creates a secure tempfile in /run/cloud-init for root.""" + calls = [] + + def fake_mkstemp(*args, **kwargs): + calls.append(kwargs) + return '/fake/return/path' + + retval = wrap_and_call( + 'cloudinit.temp_utils', + {'os.getuid': 0, + 'tempfile.mkstemp': {'side_effect': fake_mkstemp}, + '_TMPDIR': {'new': None}, + 'os.path.isdir': True}, + mkstemp) + self.assertEqual('/fake/return/path', retval) + self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls) + +# vi: ts=4 expandtab diff --git a/cloudinit/util.py b/cloudinit/util.py index 7e9d94f..4c01f44 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -1755,7 +1755,7 @@ def subp_blob_in_tempfile(blob, *args, **kwargs): args = [tuple()] # Use tmpdir over tmpfile to avoid 'text file busy' on execute - with temp_utils.tempdir() as tmpd: + with temp_utils.tempdir(needs_exe=True) as tmpd: tmpf = os.path.join(tmpd, basename) if 'args' in kwargs: kwargs['args'] = [tmpf] + list(kwargs['args']) |
