Multithread Remote SSH with Python

De Wiki Clusterlab.com.br
Revisão de 18h10min de 25 de janeiro de 2022 por Damato (discussão | contribs) (→‎src/configs.py)
Ir para navegação Ir para pesquisar

run.py

#!/usr/bin/env python
from src.activeInventory import ActiveInventory
from src.configs import Config
from src.argmenu import ArgMenu
import time
class main:
    def remoteExecution():
        argsmenu = ArgMenu()
        args = argsmenu.get_args()
        activeInventory = ActiveInventory(args.outdir)
        credfile = Config(args.cred)
        credentials = credfile.get_data("credentials")
        with open(args.servers, "r") as servers:
            control = 1
            count = 0
            for server in servers:
                count = count + 1
                if control == 0:
                    control = 1
                    continue
                for credential in credentials:
                    # time.sleep(1)
                    if control == 0:
                        break
                    if activeInventory.run_remote_command(command=args.cmd,
                                                          username=credential["username"],
                                                          password=credential["password"],
                                                          prefix=args.prefix,
                                                          server=server.replace("\n", "")) == 0:
                        control = 0

if __name__ == "__main__":
    main.remoteExecution()

runp.py

#!/usr/bin/env python
from src.activeInventory import ActiveInventory
from src.configs import Config
from src.argmenu import ArgMenu
import queue
import threading
import socket

exitFlag = 0


class MyThread(threading.Thread):
    def __init__(self, thread_id, name, q):
        threading.Thread.__init__(self)
        self.threadID = thread_id
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print("Exiting " + self.name)


def is_open(ip, port):
    a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    location = (ip, int(port))
    result_of_check = a_socket.connect_ex(location)

    if result_of_check == 0:
        return True
    else:
        return False


def process_data(thread_name, q):
    print(thread_name)
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            server = q.get()
            queueLock.release()
            argsmenu = ArgMenu()
            args = argsmenu.get_args()
            active_inventory = ActiveInventory(args.outdir)
            credfile = Config(args.cred)
            credentials = credfile.get_data("credentials")
            control = 1
            if args.timeout is not None:
                timeout = args.timeout
            else:
                timeout = 10
            if is_open(server.replace("\n", ""), 22) == True:
                for credential in credentials:
                    # time.sleep(1)
                    if control == 0:
                        break
                    if active_inventory.run_remote_command(command=args.cmd,
                                                           username=credential["username"],
                                                           password=credential["password"],
                                                           prefix=args.prefix,
                                                           server=server.replace("\n", ""),
                                                           timeout=timeout) == 0:
                        control = 0

    else:
        queueLock.release()


if __name__ == "__main__":
    argsmenu = ArgMenu()
    args = argsmenu.get_args()
    queueSize = 6000
    # queueSize = sum(1 for line in open(str(args.servers).replace("\n", "")))
    if args.threads is not None:
        threadCount = int(args.threads)
    else:
        threadCount = 10
    threadList = range(1, threadCount)
    queueLock = threading.Lock()
    workQueue = queue.Queue(queueSize)
    # workQueue = queue.Queue(int(queueSize) + 1)
    threads = []
    threadID = 1

    # Fill the queue
    queueLock.acquire()
    with open(args.servers, "r") as servers:
        for server in servers:
            workQueue.put(str(server).replace("\n", ""))
    queueLock.release()
    # Create new threads
    for tName in threadList:
        thread = MyThread(threadID, tName, workQueue)
        thread.start()
        threads.append(thread)
        threadID += 1
    # Wait for queue to empty
    while not workQueue.empty():
        pass

    # Notify threads it's time to exit
    exitFlag = 1

    # Wait for all threads to complete
    for t in threads:
        t.join()
    print("Exiting Main Thread")
    # main.remoteExecution()

src/activeinventory.py

import paramiko
import logging


class ActiveInventory:
    def __init__(self, outputpath):
        self.outputpath = outputpath
        # self.thredname = thredname

    def run_remote_command(self, command, username, password, server, prefix="output", timeout=10) -> int:
        logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s')
        # logging.basicConfig(format='{%(asctime)}-15s %(self.thredname)s %()s %(message)s')
        logging.getLogger().setLevel(logging.DEBUG)
        # k = paramiko.RSAKey.from_private_key_file("pathtofile")
        # k = paramiko.RSAKey.from_private_key("keystring")
        sshclient = paramiko.SSHClient()
        sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        exitcode = 127
        try:
            sshclient.connect(hostname=server, port=22, username=username, password=password, timeout=int(timeout))

        except paramiko.ssh_exception.AuthenticationException as e:
            logging.error(f"AUTH {server} with username {username}, MSG: {e}".replace("\n", ""))
        except paramiko.ssh_exception.SSHException as e:
            logging.error(f"SSHException {server} with username {username}, MSG: {e}".replace("\n", ""))
        # except paramiko.ssh_exception.SSHException as e:
        #     logging.error(f"Error reading SSH protocol banner for server {server}, MSG: {e}".replace("\n", ""))
        except Exception as e:
            logging.error(f"SSH {server} with username {username}, MSG: {e}: {e}".replace("\n", ""))
        else:
            logging.debug(f"SSH SUCCESSFUL FOR SERVER {server} WITH USER {username} AND PASSWORD {password}")
            logging.info(f"RUNNING COMMAND \"{command}\" FOR {server}".replace("\n", ""))
            stdin, stdout, stderr = sshclient.exec_command(command)
            outcode = str(stdout.read()).replace("\\n", "\n").replace("\\t", "\t")
            errcode = str(stderr.read()).replace("\\n", "\n").replace("\\t", "\t")
            exitcode = stdout.channel.recv_exit_status()
            self.sink(server=server, outcode=outcode, errcode=errcode, output=outcode, exitcode=exitcode, prefix=prefix)
            logging.info(f"EXITCODE {exitcode} FOR COMMAND \"{command}\" ON SERVER {server}".replace("\n", ""))
        finally:
            logging.info(f"EXECUTION END {server} WITH USER {username}")
            sshclient.close()
            return exitcode

    def sink(self, server, outcode, errcode, exitcode, output, prefix):
        try:
            with open(self.outputpath + "/" + prefix + "-" + server, "w+") as file:
                file.write(output)
        except Exception as e:
            print(f"Fail to writefile: {e}")
        finally:
            file.close()

src/argmenu.py

import argparse


class ArgMenu:
    def __init__(self):
        self.parser = argparse.ArgumentParser(description="Remote execution with output log, by D´Amato.")
        self.parser.add_argument('--cred', type=str, help="path to a json file with credentials.")
        self.parser.add_argument("--servers", type=str, help="path to a file with a list of servers, one per line.")
        self.parser.add_argument("--cmd", type=str, help="string command to run remotely")
        self.parser.add_argument("--outdir", type=str, help="directory path for the executions output")
        self.parser.add_argument("--prefix", type=str, help="a prefix for every single output file")
        self.parser.add_argument("--timeout", type=str,
                                 help="Number of seconds which session will timeout. Good to increase in slow network")
        self.parser.add_argument("--threads", type=str, help="Number of threads")

    def get_args(self):
        args = self.parser.parse_args()
        if args.cred is not None \
                and args.servers is not None \
                and args.cmd is not None \
                and args.outdir is not None \
                and args.prefix is not None:
            return self.parser.parse_args()
        else:
            print("Insufficient arguments. use -h")
            exit(1)

src/configs.py

import json


class Config:
    def __init__(self, configfile):
        self.data = json.loads(open(configfile).read())
        self.credentials = self.data["credentials"]

    def get_data(self, key):
        return self.data[key]