summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2017-09-18 21:21:40 (GMT)
committerChad Smith <chad.smith@canonical.com>2017-09-18 21:21:40 (GMT)
commita3a33b43c3c62b82c30050a78c69a11a5eca8c40 (patch)
tree605749654c384a10321b4c6e584c9464fe21fec4
parent356586cb4e1adb2a4f12e087f45415e0cacff163 (diff)
add needs_exe param to temp_utils functions. Sprinkle some unit test goodness around temp_utils. Make callsites which need exe perms provide needs_exe
-rw-r--r--cloudinit/net/dhcp.py2
-rw-r--r--cloudinit/temp_utils.py22
-rw-r--r--cloudinit/tests/test_temp_utils.py101
-rw-r--r--cloudinit/util.py2
4 files changed, 118 insertions, 9 deletions
diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py
index 60eb078..0535063 100644
--- a/cloudinit/net/dhcp.py
+++ b/cloudinit/net/dhcp.py
@@ -48,7 +48,7 @@ def maybe_perform_dhcp_discovery(nic=None):
if not dhclient_path:
LOG.debug('Skip dhclient configuration: No dhclient command found.')
return {}
- with temp_utils.tempdir(dir='/var/tmp', prefix='cloud-init-dhcp-') as tdir:
+ with temp_utils.tempdir(prefix='cloud-init-dhcp-', needs_exe=True) as tdir:
# Use /var/tmp because /run/cloud-init/tmp is mounted noexec
return dhcp_discovery(dhclient_path, nic, tdir)
diff --git a/cloudinit/temp_utils.py b/cloudinit/temp_utils.py
index 0355f19..5d7adf7 100644
--- a/cloudinit/temp_utils.py
+++ b/cloudinit/temp_utils.py
@@ -8,9 +8,10 @@ import tempfile
_TMPDIR = None
_ROOT_TMPDIR = "/run/cloud-init/tmp"
+_EXE_ROOT_TMPDIR = "/var/tmp/cloud-init"
-def _tempfile_dir_arg(odir=None):
+def _tempfile_dir_arg(odir=None, needs_exe=False):
"""Return the proper 'dir' argument for tempfile functions.
When root, cloud-init will use /run/cloud-init/tmp to avoid
@@ -20,8 +21,10 @@ def _tempfile_dir_arg(odir=None):
If the caller of this function (mkdtemp or mkstemp) was provided
with a 'dir' argument, then that is respected.
- @param odir: original 'dir' arg to 'mkdtemp' or other."""
-
+ @param odir: original 'dir' arg to 'mkdtemp' or other.
+ @param needs_exe: Boolean specifying whether or not exe permissions are
+ needed for tempdir. This is needed because /run is mounted noexec.
+ """
if odir is not None:
return odir
@@ -29,7 +32,9 @@ def _tempfile_dir_arg(odir=None):
if _TMPDIR:
return _TMPDIR
- if os.getuid() == 0:
+ if needs_exe:
+ tdir = _EXE_ROOT_TMPDIR
+ elif os.getuid() == 0:
tdir = _ROOT_TMPDIR
else:
tdir = os.environ.get('TMPDIR', '/tmp')
@@ -42,7 +47,8 @@ def _tempfile_dir_arg(odir=None):
def ExtendedTemporaryFile(**kwargs):
- kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None))
+ kwargs['dir'] = _tempfile_dir_arg(
+ kwargs.pop('dir', None), kwargs.pop('needs_exe', False))
fh = tempfile.NamedTemporaryFile(**kwargs)
# Replace its unlink with a quiet version
# that does not raise errors when the
@@ -82,12 +88,14 @@ def tempdir(**kwargs):
def mkdtemp(**kwargs):
- kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None))
+ kwargs['dir'] = _tempfile_dir_arg(
+ kwargs.pop('dir', None), kwargs.pop('needs_exe', False))
return tempfile.mkdtemp(**kwargs)
def mkstemp(**kwargs):
- kwargs['dir'] = _tempfile_dir_arg(kwargs.pop('dir', None))
+ kwargs['dir'] = _tempfile_dir_arg(
+ kwargs.pop('dir', None), kwargs.pop('needs_exe', False))
return tempfile.mkstemp(**kwargs)
# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_temp_utils.py b/cloudinit/tests/test_temp_utils.py
new file mode 100644
index 0000000..ffbb92c
--- /dev/null
+++ b/cloudinit/tests/test_temp_utils.py
@@ -0,0 +1,101 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloudinit.temp_utils"""
+
+from cloudinit.temp_utils import mkdtemp, mkstemp
+from cloudinit.tests.helpers import CiTestCase, wrap_and_call
+
+
+class TestTempUtils(CiTestCase):
+
+ def test_mkdtemp_default_non_root(self):
+ """mkdtemp creates a dir under /tmp for the unprivileged."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/tmp'}], calls)
+
+ def test_mkdtemp_default_non_root_needs_exe(self):
+ """mkdtemp creates a dir under /var/tmp/cloud-init when needs_exe."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp, needs_exe=True)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/var/tmp/cloud-init'}], calls)
+
+ def test_mkdtemp_default_root(self):
+ """mkdtemp creates a dir under /run/cloud-init for the privileged."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 0,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
+
+ def test_mkstemp_default_non_root(self):
+ """mkstemp creates secure tempfile under /tmp for the unprivileged."""
+ calls = []
+
+ def fake_mkstemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkstemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/tmp'}], calls)
+
+ def test_mkstemp_default_root(self):
+ """mkstemp creates a secure tempfile in /run/cloud-init for root."""
+ calls = []
+
+ def fake_mkstemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 0,
+ 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkstemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 7e9d94f..4c01f44 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -1755,7 +1755,7 @@ def subp_blob_in_tempfile(blob, *args, **kwargs):
args = [tuple()]
# Use tmpdir over tmpfile to avoid 'text file busy' on execute
- with temp_utils.tempdir() as tmpd:
+ with temp_utils.tempdir(needs_exe=True) as tmpd:
tmpf = os.path.join(tmpd, basename)
if 'args' in kwargs:
kwargs['args'] = [tmpf] + list(kwargs['args'])