Skip to content
Snippets Groups Projects
Commit 14887e45 authored by Ivo Nutar's avatar Ivo Nutar
Browse files

Initial commit

parents
No related branches found
Tags 2020.4.1
No related merge requests found
Showing
with 1036 additions and 0 deletions
[run]
omit =
*/cryton_worker/etc/*
.env 0 → 100644
CRYTON_WORKER_MODULES_DIR=/opt/cryton-worker/modules/
CRYTON_WORKER_DEBUG=False
CRYTON_WORKER_MSFRPCD_PASS=toor
CRYTON_WORKER_MSFRPCD_USERNAME=msf
CRYTON_WORKER_MSFRPCD_PORT=55553
CRYTON_WORKER_MSFRPCD_SSL=False
CRYTON_WORKER_RABBIT_USERNAME=admin
CRYTON_WORKER_RABBIT_PASSWORD=mypass
CRYTON_WORKER_RABBIT_SRV_ADDR=CHANGE_ME
CRYTON_WORKER_RABBIT_SRV_PORT=5672
CRYTON_WORKER_RABBIT_WORKER_PREFIX=CHANGE_ME
CRYTON_WORKER_CORE_COUNT=0
__pycache__/
*.py[cod]
*$py.class
.idea/
*.egg-info
reports/
build/
dist/
log/
store.sqlite3
image: python:3.8-alpine
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
cache:
paths:
- .cache/pip
- venv/
before_script:
- python --version
- pip install virtualenv
- virtualenv venv
- - for i in `cat .env`; do export $i; done;
- source venv/bin/activate
- python setup.py install
- python setup.py develop
stages:
- unittests
- buildPython
unittests:
stage: unittests
artifacts:
paths:
- .cache/pip
- venv
script:
- pytest --cov=cryton_worker tests/unit_tests/ --cov-config=.coveragerc-unit
coverage: '/TOTAL.*\s+(\d+%)$/'
buildPython:
stage: buildPython
script:
- python3 setup.py bdist_wheel
artifacts:
paths:
- dist
- .cache/pip
- venv/
expire_in: 10 mins
FROM python:3.8-alpine
WORKDIR /worker
COPY . /worker
RUN [ "python", "/worker/setup.py", "install" ]
MIT License
Copyright (c) 2019 Masaryk University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
README.md 0 → 100644
<!-- TOC depthFrom:1 depthTo:6 withLinks:1 updateOnSave:1 orderedList:0 -->
- [Name](#name)
- [Description](#description)
- [Dependencies](#dependencies)
- [Installation](#installation)
- [From source](#from-source)
- [Docker](#docker)
- [Usage](#usage)
- [Starting Worker (Slave server)](#starting-worker-slave-server)
- [Manual execution](#manual-execution)
- [Service execution](#service-execution)
<!-- /TOC -->
![Coverage](https://gitlab.ics.muni.cz/cryton/cryton-worker/badges/master/coverage.svg)
# Name
Cryton worker
# Description
Cryton worker is a module for executing Cryton attack modules both locally and remotely. To control Cryton worker, Cryton Core is needed (https://gitlab.ics.muni.cz/cryton/cryton-core).
Cryton worker utilizes RabbitMQ (https://www.rabbitmq.com/) as it's messaging protocol for asynchronous RPC.
# Dependencies
## For docker
* docker.io
* docker-compose
## For manual installation
* python3.8
* metasploit-framework
* (optional) pipenv
# Installation
Important note: this guide only explains how to install **Cryton Worker** package. For being able to execute the attack scenarios, you also need to install the **Cryton Core** package. If you want to use attack modules provided by Cryton, you have to also install **Cryton Modules**.
For correct installation you need to update `.env` file. For example `CRYTON_WORKER_RABBIT_SRV_ADDR` must contain the same address as you rabbit server and `CRYTON_WORKER_RABBIT_WORKER_PREFIX` must be the same as the one saved in Cryton Core.
## From source (recommended)
For manual installation all you need to do is **export variables** and **run the setup script**.
~~~~
$ python3 setup.py install
~~~~
You can also install this inside a virtual environment using pipenv.
~~~~
$ pipenv shell
(cryton-worker) $ python setup.py install
~~~~
## Docker
**NOTICE: Following guide won't describe how to install or mount applications used by modules.**
First make sure you have Docker installed:
~~~~
user@localhost:~ $ sudo apt install docker.io docker-compose
~~~~
Add yourself to the group docker so you can work with docker CLI without sudo:
~~~~
user@localhost:~ $ sudo groupadd docker
user@localhost:~ $ sudo usermod -aG docker $USER
user@localhost:~ $ newgrp docker
user@localhost:~ $ docker run hello-world
~~~~
Now, run docker-compose, which will pull, build and start all necessary docker images:
~~~~
user@localhost:~ $ cd cryton-worker/
user@localhost:~ /cryton-worker $ docker-compose up -d
~~~~
This process might take a while, especially if it is the first time you run it - Cryton-Worker image must be built.
After a while you should see something like this:
~~~~
Creating cryton_worker ... done
~~~~
Everything should be set. Check if the installation was successful:
~~~~
user@localhost:~ /cryton-core $ docker-compose ps
Name Command State Ports
---------------------------------------------
cryton_worker cryton-worker Up
~~~~
If is in state `Restarting`, something went wrong. Please check if Rabbit address is correct.
# Usage
For Cryton core to be able to run it's attack modules it has to command the worker service, which takes care of execution. If such worker service is run remotely (on separate machine), it is called **Slave**. The installation of such a Slave is the same as locally - all you need to do is to install the package and start the service.
If you want to execute the modules using Slaves, you have to provide **hosts file** - a file with the Slaves you have under control. This file also contains other hosts you are specifying for the Plan template.
## Starting Worker (Slave server)
There are two ways of running the listening Worker. You can either run it manually or create a service to be run by systemd.
### Manual execution
~~~~
$ cryton-worker
[*] Waiting for messages. To exit press CTRL+C
~~~~
### Service execution
For configuring the service, you can simply copy the file from repository (*cryton_worker/etc/systemd-service/cryton-worker.service*).
*Note: change WorkingDirectory to the actual path to your Cryton Worker directory*
~~~~
[Unit]
Description=Cryton worker
After=multi-user.target
Conflicts=getty@tty1.service
[Service]
Type=simple
WorkingDirectory=/opt/cryton-worker
ExecStart=/usr/local/bin/pipenv run cryton-worker
StandardInput=tty-force
[Install]
WantedBy=multi-user.target
~~~~
import click
from cryton_worker.etc import config
from cryton_worker.lib import worker
@click.group()
@click.version_option()
def cli() -> None:
"""
Cryton Worker CLI.
\f
:return: None
"""
pass
@cli.command('start')
@click.option('--install-requirements', is_flag=True,
help='Install Python requirements from each requirements.txt in modules_dir.')
@click.option('-Ru', '--rabbit-username', type=click.STRING, default=config.RABBIT_USERNAME, show_default=True,
help='Rabbit login username.')
@click.option('-Rp', '--rabbit-password', type=click.STRING, default=config.RABBIT_PASSWORD, show_default=True,
help='Rabbit login password.')
@click.option('-Rh', '--rabbit-host', type=click.STRING, default=config.RABBIT_SRV_ADDR, show_default=True,
help='Rabbit server host.')
@click.option('-RP', '--rabbit-port', type=click.INT, default=config.RABBIT_SRV_PORT, show_default=True,
help='Rabbit server port.')
@click.option('-p', '--prefix', type=click.STRING, default=config.RABBIT_WORKER_PREFIX, show_default=True,
help='What prefix should the Worker use.')
@click.option('-cc', '--core-count', type=click.INT, default=config.CORE_COUNT, show_default=True,
help='How many processes to use for queues.')
@click.option('-mr', '--max-retries', type=click.INT, default=3, show_default=True,
help='How many times to try to connect.')
def start_worker(install_requirements: bool, rabbit_username: str, rabbit_password: str,
rabbit_host: str, rabbit_port: int, prefix: str, core_count: int, max_retries: int) -> None:
"""
Start worker and try to connect to Rabbit server
\f
:param core_count: How many processes to use for queues
:param prefix: What prefix should the Worker use
:param rabbit_port: Rabbit server port
:param rabbit_host: Rabbit server host
:param rabbit_password: Rabbit login username
:param rabbit_username: Rabbit login password
:param max_retries: How many times to try to connect
:param install_requirements: Install Python requirements from each requirements.txt in modules_dir
:return: None
"""
if install_requirements:
worker.install_modules_requirements()
worker.start(rabbit_host, rabbit_port, rabbit_username, rabbit_password, prefix, core_count, max_retries)
[LOGGING_CONFIG]
LOG_CONFIG: /etc/cryton/logging_config_worker.yaml
import os
import configparser
CRYTON_WORKER_CONFIG_PATH = os.getenv('CRYTON_WORKER_CONFIG', '/etc/cryton/config-worker.ini')
DEBUG = True if os.getenv('CRYTON_WORKER_DEBUG') == 'True' else False
MSFRPCD_PASS = os.getenv('CRYTON_WORKER_MSFRPCD_PASS')
MSFRPCD_USERNAME = os.getenv('CRYTON_WORKER_MSFRPCD_USERNAME')
MSFRPCD_PORT = int(os.getenv('CRYTON_WORKER_MSFRPCD_PORT'))
MSFRPCD_SSL = os.getenv('CRYTON_WORKER_MSFRPCD_SSL') == 'True'
MODULES_DIR = os.getenv('CRYTON_WORKER_MODULES_DIR')
RABBIT_USERNAME = os.getenv('CRYTON_WORKER_RABBIT_USERNAME')
RABBIT_PASSWORD = os.getenv('CRYTON_WORKER_RABBIT_PASSWORD')
RABBIT_SRV_ADDR = os.getenv('CRYTON_WORKER_RABBIT_SRV_ADDR')
RABBIT_SRV_PORT = int(os.getenv('CRYTON_WORKER_RABBIT_SRV_PORT'))
RABBIT_WORKER_PREFIX = os.getenv('CRYTON_WORKER_RABBIT_WORKER_PREFIX')
CORE_COUNT = os.getenv('CRYTON_WORKER_CORE_COUNT')
config = configparser.ConfigParser(allow_no_value=True)
config.read(CRYTON_WORKER_CONFIG_PATH)
LOG_CONFIG = config.get('LOGGING_CONFIG', 'LOG_CONFIG')
---
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(message)s"
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
sys-logger:
class: logging.handlers.SysLogHandler
level: INFO
address: '/dev/log'
formatter: simple
root:
level: NOTSET
handlers: [console]
propagate: yes
loggers:
cryton-worker:
level: INFO
handlers: [sys-logger]
propagate: no
cryton-worker-debug:
level: DEBUG
handlers: [console]
propagate: yes
...
[Unit]
Description=Cryton executor
After=multi-user.target
Conflicts=getty@tty1.service
[Service]
Type=simple
WorkingDirectory=/opt/cryton-executor
ExecStart=/usr/local/bin/pipenv run cryton-executor
StandardInput=tty-force
[Install]
WantedBy=multi-user.target
\ No newline at end of file
from cryton_worker.lib import util
def process_event(event_t: str, body: dict) -> dict:
"""
Decide what to do based on event
:param event_t: Event type
:param body: Event value
:return: event result
"""
if event_t == 'KILL_EXECUTION':
event_v = event_kill_execution(body)
elif event_t == 'VALIDATE_MODULE':
event_v = event_validate_module(body)
elif event_t == 'LIST_MODULES':
event_v = event_list_modules(body)
elif event_t == 'LIST_SESSIONS':
event_v = event_list_sessions(body)
elif event_t == 'HEALTHCHECK':
event_v = event_health_check(body)
else:
event_v = {'return_code': -2}
return event_v
def event_validate_module(body: dict) -> dict:
"""
Event, when triggered validate requested module.
:param body: Arguments for function
:return: Result dictionary containing details about the job.
"""
attack_module = body.get('attack_module')
module_arguments = body.get('attack_module_arguments')
result_dict = util.validate_module(attack_module, module_arguments)
return result_dict
def event_list_modules(_: dict) -> dict:
"""
Event, when triggered list all modules available on Worker.
:param _: Arguments for function
:return: Result dictionary containing details about the job.
"""
result = util.list_modules()
result_dict = {'module_list': result}
return result_dict
def event_list_sessions(body: dict) -> dict:
"""
Event, when triggered list all sessions.
:param body: Arguments for function
:return: Result dictionary containing details about the job.
"""
target_ip = body.get('target_ip')
result = util.list_sessions(target_ip)
result_dict = {'session_list': result}
return result_dict
def event_kill_execution(_: dict) -> dict: # TODO: implement later
"""
Event, when triggered kill Step's Execution.
:param _: Arguments for function
:return: Result dictionary containing details about the job.
"""
result = util.kill_execution()
result_dict = {'return_code': result}
return result_dict
def event_health_check(_: dict) -> dict: # TODO: implement later
"""
Event, when triggered check if Worker is OK.
:param _: Arguments for function
:return: Result dictionary containing details about the job.
"""
result = 0
result_dict = {'return_code': result}
return result_dict
class Error(Exception):
"""Base class for exceptions in this module."""
pass
# TODO: custom exceptions?
import structlog
import logging.config
from cryton_worker.etc import config
import yaml
"""
Default Cryton logger setup and configuration
"""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
with open(config.LOG_CONFIG, 'rt') as f:
config_file = yaml.safe_load(f.read())
logging.config.dictConfig(config_file)
if config.DEBUG:
logger = structlog.get_logger("cryton-worker-debug")
logger.setLevel(logging.DEBUG)
else:
logger = structlog.get_logger("cryton-worker")
logger.setLevel(logging.INFO)
import importlib
from cryton_worker.lib import logger
import sys
import glob
import base64
import os
from schema import Schema
from pymetasploit3.msfrpc import MsfRpcClient
from cryton_worker.etc import config
def execute_module(name: str, arguments: dict) -> dict:
"""
Execute specified module by name with arguments.
:param name: name of the module relative to config.MODULES_DIR
:param arguments: additional module arguments
:return: execution result
"""
logger.logger.info("Executing module", module_name=name, arguments=arguments)
ret = run_executable(name, arguments)
# Encode file contents as base64
ret_file = ret.get('file')
if isinstance(ret_file, dict):
file_contents = ret_file.get('file_contents')
if not isinstance(file_contents, (bytes, str)):
file_contents = bytes(str(file_contents), 'utf8')
if isinstance(file_contents, str):
file_contents = bytes(file_contents, 'utf8')
if isinstance(file_contents, bytes):
# If already encoded
if ret_file.get('file_contents_encoding') == 'base64':
file_contents_b64 = file_contents
else:
file_contents_b64 = base64.b64encode(file_contents)
# conversion back from bytes to string
ret_file.update({'file_contents': file_contents_b64.decode(), 'file_contents_encoding': 'base64'})
# TODO: file content must be saved somewhere or else it will brake the execution, or at least send an err msg
else:
raise TypeError('Cannot parse type "{}" of data in "file"'.format(type(file_contents)))
logger.logger.info("Module execution finished", module_name=name, arguments=arguments, ret=ret)
return ret
def run_executable(module_path: str, executable_args: dict) -> dict:
"""
Run module specified by name and arguments. The module does not have to be installed,
as the path is being added to the system PATH.
:param module_path: path to the module directory relative to config.MODULES_DIR
:param executable_args: additional module arguments
:return: execution result
"""
try:
module_obj = import_module(module_path)
except Exception as ex:
return {'return_code': -2, 'std_err': 'Either module {} does not exist or there is some other problem. '
'Original error: {}.'.format(module_path, ex)}
try:
executable = module_obj.execute
except AttributeError:
ret = {'return_code': -2, 'std_err': 'Module {} does not have execute function'.format(module_path)}
else:
try:
ret = executable(executable_args)
except Exception as ex:
ret = {'return_code': -1, 'std_err': str(ex)}
return ret
def validate_module(name: str, arguments: dict) -> dict:
"""
Validate specified module by name with arguments.
:param name: name of the module relative to config.MODULES_DIR
:param arguments: additional module arguments
:return: validation result
"""
logger.logger.info("Validating module", module_name=name, arguments=arguments)
try:
module_obj = import_module(name)
except Exception as ex:
return {'return_code': -2, 'std_err': 'Either module {} does not exist or there is some other problem. '
'Original error: {}.'.format(name, ex)}
try:
executable = module_obj.validate
except AttributeError:
# validate function does not exist
ret = {'return_code': -2, 'std_err': 'Module {} does not have validate function.'.format(name)}
else:
try:
ret_val = executable(arguments)
ret = {'return_code': ret_val, 'std_out': 'Module {} is valid.'.format(name)}
except Exception as ex:
ret = {'return_code': -1, 'std_err': 'Module {} is not valid. Original exception: {}'.format(name, ex)}
logger.logger.info("Module validation finished", module_name=name, arguments=arguments, ret=ret)
return ret
def import_module(module_path: str):
"""
Import module specified by name. The module does not have to be installed,
as the path is being added to the system PATH.
:param module_path: path to the module directory relative to config.MODULES_DIR
:return: imported module object
"""
sys.path.insert(0, config.MODULES_DIR)
module = module_path
if not module.endswith('/'):
module += '/'
module += 'mod'
try:
module_obj = importlib.import_module(module)
except ModuleNotFoundError as ex:
raise ex
return module_obj
def list_sessions(target_ip: str) -> list:
"""
Get list of available session IDs for given target IP.
:param target_ip: target IP
:return: list of session IDs
"""
client = MsfRpcClient(config.MSFRPCD_PASS, username=config.MSFRPCD_USERNAME,
port=config.MSFRPCD_PORT, ssl=config.MSFRPCD_SSL)
sessions_list = list()
for session_key, session_value in client.sessions.list.items():
if session_value['target_host'] == target_ip or session_value['tunnel_peer'].split(':')[0] == target_ip:
sessions_list.append(session_key)
logger.logger.info("Listing sessions", target=target_ip, sessions_list=sessions_list)
return sessions_list
def list_modules() -> list:
"""
Get list of available modules.
:return: list of available modules
"""
default_modules_dir = config.MODULES_DIR
# List all python files, exclude init files
files = [f.replace(default_modules_dir, '') for f in glob.glob(default_modules_dir + "**/*.py", recursive=True)]
files = list(filter(lambda a: a.find('__init__.py') == -1, files))
logger.logger.info("Listing modules", modules_list=files)
return files
def kill_execution() -> int: # TODO: implement later on
"""
Kill chosen execution.
:return: return code
"""
logger.logger.info("Killing execution")
return -1
def get_file_content(file_path: str) -> bytes:
"""
Get file binary content from path
:param file_path: path to wanted file
:return: binary content of the desired file
"""
with open(file_path, 'rb') as bf:
file_content = bf.read()
return file_content
# schema file validation function
class File(object):
"""
Utility function to combine validation directives in AND Boolean fashion.
"""
def __init__(self, *args, **kw):
self._args = args
if not set(kw).issubset({"error", "schema", "ignore_extra_keys"}):
diff = {"error", "schema", "ignore_extra_keys"}.difference(kw)
raise TypeError("Unknown keyword arguments %r" % list(diff))
self._error = kw.get("error")
self._ignore_extra_keys = kw.get("ignore_extra_keys", False)
# You can pass your inherited Schema class.
self._schema = kw.get("schema", Schema)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, ", ".join(repr(a) for a in self._args))
def validate(self, data):
"""
Validate data using defined sub schema/expressions ensuring all
values are valid.
:param data: to be validated with sub defined schemas.
:return: returns validated data
"""
for s in [self._schema(s, error=self._error, ignore_extra_keys=self._ignore_extra_keys) for s in self._args]:
data = s.validate(data)
if os.path.isfile(data):
return data
else:
raise Exception("{} isn't valid file.".format(data))
# schema directory validation function
class Dir(object):
"""
Utility function to combine validation directives in AND Boolean fashion.
"""
def __init__(self, *args, **kw):
self._args = args
if not set(kw).issubset({"error", "schema", "ignore_extra_keys"}):
diff = {"error", "schema", "ignore_extra_keys"}.difference(kw)
raise TypeError("Unknown keyword arguments %r" % list(diff))
self._error = kw.get("error")
self._ignore_extra_keys = kw.get("ignore_extra_keys", False)
# You can pass your inherited Schema class.
self._schema = kw.get("schema", Schema)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, ", ".join(repr(a) for a in self._args))
def validate(self, data):
"""
Validate data using defined sub schema/expressions ensuring all
values are valid.
:param data: to be validated with sub defined schemas.
:return: returns validated data
"""
for s in [self._schema(s, error=self._error, ignore_extra_keys=self._ignore_extra_keys) for s in self._args]:
data = s.validate(data)
if os.path.isdir(data):
return data
else:
raise Exception("{} isn't valid directory.".format(data))
from cryton_worker.lib import event, util
from cryton_worker.etc import config
from cryton_worker.lib import logger
import datetime
import json
import amqpstorm
import subprocess
import os
import sys
from threading import Thread
from multiprocessing import Process, Event
import time
class ThreadConsumer:
def __init__(self, callback: exec, message: amqpstorm.Message):
"""
Consumer.
:param callback: Custom Rabbit callback.
:param message: RabbitMQ Message.
"""
self.callback = callback
self.message = message
self.correlation_id = self.message.correlation_id
self._stopped = False
def __call__(self):
"""
Rabbit callback for executing custom callbacks and sending response to reply_to.
:return: None
"""
self.message.ack()
logger.logger.debug('Consuming callback.', callback=self.callback, message=self.message)
message_body = json.loads(self.message.body)
ret = self.callback(message_body)
ret = json.dumps(ret)
self.message.channel.queue.declare(self.message.reply_to)
self.message.properties.update({'content_encoding': 'utf-8'})
self.message.properties.update({'timestamp': datetime.datetime.now()})
response = amqpstorm.Message.create(self.message.channel, ret, self.message.properties)
response.publish(self.message.reply_to)
def stop(self): # TODO: finish later
pass
class ProcessConsumer:
def __init__(self, hostname: str, username: str, password: str, port: int, queue: str, callback_executable: exec,
max_retries: int):
"""
ProcessConsumer
:param hostname: RabbitMQ server hostname.
:param username: Username to use when connecting to RabbitMQ.
:param password: Password to use when connecting to RabbitMQ.
:param port: RabbitMQ server port.
:param queue: Rabbit queue, that will be consumed.
:param callback_executable: Custom Rabbit callback.
"""
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.queue = queue
self.callback_executable = callback_executable
self.max_retries = max_retries
self._connection: amqpstorm.Connection or None = None
self._consumers = []
self.stopped = Event()
logger.logger.debug('ProcessConsumer created.', queue=self.queue, callback=self.callback_executable)
def start(self):
"""
Start the Consumers.
:return: None
"""
self.stopped.clear()
if not self._connection or self._connection.is_closed:
self._create_connection()
while not self.stopped.is_set():
try:
# Check our connection for errors.
self._connection.check_for_errors()
if not self._connection.is_open:
raise amqpstorm.AMQPConnectionError('connection closed')
channel = self._connection.channel()
channel.basic.qos(1)
channel.queue.declare(self.queue)
channel.basic.consume(self._create_consumer, self.queue)
channel.start_consuming()
except amqpstorm.AMQPError as ex:
# If an error occurs, re-connect and let update_consumers re-open the channels.
logger.logger.warning(ex)
self._stop_consumers()
self._create_connection()
def stop(self):
"""
Stop.
:return: None
"""
self._stop_consumers()
self.stopped.set()
if self._connection is not None:
self._connection.close()
def _stop_consumers(self):
"""
Stop all consumers.
:return: None
"""
while self._consumers:
consumer: ThreadConsumer = self._consumers.pop()
consumer.stop()
def _create_connection(self):
"""
Create a Rabbit connection.
:return: None
"""
attempts = 0
while True:
attempts += 1
if self.stopped.is_set():
break
try:
self._connection = amqpstorm.Connection(self.hostname, self.username, self.password, self.port)
logger.logger.info('Successfully connected.', hostname=self.hostname, port=self.port, queue=self.queue)
print('Successfully created connection.')
break
except amqpstorm.AMQPError as ex:
logger.logger.warning(ex)
if self.max_retries and attempts > self.max_retries:
logger.logger.error('max number of retries reached')
self.stop()
raise Exception('max number of retries reached')
time.sleep(min(attempts * 2, 30))
def _create_consumer(self, message: amqpstorm.Message):
"""
Create ThreadedConsumer for new callback.
:param message: RabbitMQ Message.
:return: None
"""
consumer = ThreadConsumer(self.callback_executable, message)
self._start_consumer(consumer)
self._consumers.append(consumer)
@staticmethod
def _start_consumer(consumer: ThreadConsumer):
"""
Start a consumer in new Thread.
:param consumer: Consumer instance.
:return: None
"""
thread = Thread(target=consumer)
thread.daemon = True
thread.start()
class Worker:
def __init__(self, rabbit_host: str, rabbit_port: int, rabbit_username: str, rabbit_password: str, prefix: str,
core_count: int, max_retries: int = 3):
"""
Worker.
"""
self.hostname = rabbit_host
self.port = rabbit_port
self.username = rabbit_username
self.password = rabbit_password
self.prefix = prefix
self.max_retries = max_retries
if core_count == 0:
self.core_count = len(os.sched_getaffinity(0))
else:
self.core_count = core_count
self._consumers = []
self._processes = []
self._stopped = False
logger.logger.debug('Worker created.', hostname=self.hostname, port=self.port, prefix=self.prefix,
core_count=self.core_count)
def start(self, queues: dict):
"""
Check consumer condition, create queues and their consumers in processes.
:param queues: Rabbit queues (without prefix) with callbacks (queue_name: queue_callback)
:return: None
"""
# Prepare processes for each queue
for queue, callback in queues.items():
queue_with_prefix = queue.format(self.prefix)
self._create_processes_for_queue(queue_with_prefix, callback)
logger.logger.info('Worker is running, waiting for connections...', hostname=self.hostname, port=self.port,
prefix=self.prefix)
print('Worker is running, waiting for connections...')
# Start prepared processes
for process in self._processes:
process.start()
# Keep Worker alive
while not self._stopped:
all_stopped = True
for consumer in self._consumers:
if not consumer.stopped.is_set():
all_stopped = False
if all_stopped:
self.stop()
else:
time.sleep(5)
def stop(self):
"""
Stop Worker and it's consumers.
:return: None
"""
while self._consumers:
consumer: ProcessConsumer = self._consumers.pop()
consumer.stop()
self._stopped = True
def _create_processes_for_queue(self, queue: str, callback: exec):
"""
Create ScalableConsumers for number of cores and create their processes.
:param queue: Rabbit queue, that will be consumed.
:param callback: Custom Rabbit callback.
:return: None
"""
for _ in range(self.core_count):
consumer = ProcessConsumer(self.hostname, self.username, self.password, self.port, queue, callback,
self.max_retries)
self._consumers.append(consumer)
self._processes.append(Process(target=consumer.start))
def callback_attack(message_body: dict) -> dict:
"""
Callback function for executing modules.
:param message_body: RabbitMQ Message body.
:return: Response containing details about the job.
"""
module_name = message_body.get('attack_module')
module_arguments = message_body.get('attack_module_arguments')
ret = util.execute_module(module_name, module_arguments)
return ret
def callback_control(message_body: dict) -> dict:
"""
Callback function for controlling Worker.
:param message_body: RabbitMQ Message body.
:return: Response containing details about the job.
"""
event_t = message_body.pop('event_t')
event_v = event.process_event(event_t, message_body)
ret = {'event_t': event_t, 'event_v': event_v}
return ret
def start(rabbit_host: str, rabbit_port: int, rabbit_username: str, rabbit_password: str, prefix: str, core_count: int,
max_retries: int):
"""
Creates Worker which connects to Rabbit server and listens for new messages.
:return: None
"""
attack_q = 'cryton_worker.{}.attack.request'
control_q = 'cryton_worker.{}.control.request'
queues = {attack_q: callback_attack, control_q: callback_control}
worker = Worker(rabbit_host, rabbit_port, rabbit_username, rabbit_password, prefix, core_count, max_retries)
worker.start(queues)
def install_modules_requirements():
"""
Go through module directories and install all requirement files.
:return: None
"""
for root, dirs, files in os.walk(config.MODULES_DIR):
for filename in files:
if filename == "requirements.txt":
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", os.path.join(root, filename)])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment