Skip to content
Snippets Groups Projects
Commit 1a299aee authored by Jiří Rája's avatar Jiří Rája 🔥
Browse files

Merge branch 'fix-read-shell-output' into 'master'

Added minimal_execution_time parameter to make sure the output is captured

See merge request cryton/cryton-worker!58
parents 81728ce8 973a5e51
No related branches found
No related tags found
No related merge requests found
......@@ -209,34 +209,44 @@ class Metasploit:
logger.logger.debug("Finished listing sessions.", found_sessions=found_sessions)
return found_sessions
def read_shell_output(self, session_id: str, timeout: int = None) -> str:
def read_shell_output(self, session_id: str, timeout: int = None, minimal_execution_time: int = None) -> str:
"""
Read whole output from shell in session.
:param session_id: Metasploit session ID
:param timeout: Timeout for reading from shell
:param minimal_execution_time: Time to wait for the output before reading from the shell
:return: Data from session
"""
shell = self.client.sessions.session(session_id)
result = ""
timestamp = time.time()
if minimal_execution_time is None:
minimal_execution_time = 3
if timeout:
timeout = time.time() + timeout
timeout = timestamp + max(timeout, minimal_execution_time)
while shell_data := shell.read():
minimal_execution_time = timestamp + minimal_execution_time
while (shell_data := shell.read()) or (time.time() < minimal_execution_time):
result += shell_data
if timeout and time.time() >= timeout:
break
time.sleep(1)
return result
def execute_in_session(self, command: str, session_id: str, timeout: int = None, end_check: list = None,
close: bool = False) -> str:
close: bool = False, minimal_execution_time: int = None) -> str:
"""
Execute command in MSF session. Optionally close it.
:param command: Command to execute
:param session_id: Metasploit session ID
:param end_check: Letters that when found will end output gathering from exploit execution
:param close: If the session should be closed after executing the command
:param timeout: Timeout for reading from shell
:param timeout: Timeout for reading from shell (no end_check only)
:param minimal_execution_time: Time to wait for the output before reading from the shell (no end_check only)
:raises:
KeyError if session cannot be read
:return: Output from the shell
......@@ -250,7 +260,7 @@ class Metasploit:
result = shell.run_with_output(cmd=command, end_strs=end_check)
else:
shell.write(command)
result = self.read_shell_output(session_id, timeout)
result = self.read_shell_output(session_id, timeout, minimal_execution_time)
if close:
shell.stop()
......
[tool.poetry]
name = "cryton-worker"
version = "1.0.2"
version = "1.1.0"
description = "Attack scenario orchestrator for Cryton"
authors = [
"Ivo Nutár <nutar@ics.muni.cz>",
......
......@@ -160,6 +160,7 @@ class TestMetasploit(TestCase):
self.assertEqual([], ret)
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_execute_in_session(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.side_effect = ["data_in_buffer", ""]
......@@ -172,6 +173,7 @@ class TestMetasploit(TestCase):
self.assertEqual(shell_session_mock.read.call_count, 2)
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_execute_in_session_with_timeout(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.return_value = ""
......@@ -182,6 +184,7 @@ class TestMetasploit(TestCase):
self.assertEqual(result, "command_output")
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_execute_in_session_check_end(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.return_value = ""
......@@ -191,24 +194,37 @@ class TestMetasploit(TestCase):
self.mock_client.return_value.sessions.session.assert_called_once_with("session_id")
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_read_shell_output(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.side_effect = ["final", "result", ""]
result = self.msf.read_shell_output("1", None)
result = self.msf.read_shell_output("1", None, 0)
self.assertEqual(result, "finalresult")
self.assertEqual(shell_session_mock.read.call_count, 3)
@patch("time.time", Mock(side_effect=[1, 3]))
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_read_shell_output_with_timeout(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.side_effect = ["first", "second"]
result = self.msf.read_shell_output("1", 1)
result = self.msf.read_shell_output("1", 1, 1)
self.assertEqual(result, "first")
self.assertEqual(shell_session_mock.read.call_count, 1)
@patch("time.time", Mock(side_effect=[1, 2, 3]))
@patch("pymetasploit3.msfrpc.ShellSession")
@patch("time.sleep", Mock())
def test_read_shell_output_with_minimal_execution_time(self, shell_session_mock):
self.mock_client.return_value.sessions.session.return_value = shell_session_mock
shell_session_mock.read.side_effect = ["", "second", ""]
result = self.msf.read_shell_output("1", minimal_execution_time=2)
self.assertEqual(result, "second")
self.assertEqual(shell_session_mock.read.call_count, 3)
def test_execute_exploit(self):
mock_exploit = Mock()
exploit_name = "test_exploit"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment