Deep SSL inspection [Tutorial] [Python] [Version=alpha]

Started by Nambis, October 01, 2023, 05:08:01 PM

Previous topic - Next topic
Hello,

I've been dabbling a bit with Python and have developed a proxy environment with which traffic is first decrypted, inspected by an IPS, and then re-encrypted.

The whole scenario works by means of an SSL terminating proxy, which is connected upstream of the IPS, the downstream proxy2 independently establishes an HTTPS/SSL connection for example to a web server and transmits the data again unencrypted to proxy1.

With this configuration the data stream can be examined, I succeeded to detect Eicar which should be downloaded via HTTPS and to prevent the download automatically, by Suricata IPS.

This solution is still in alpha stage and there are still problems especially for SSL pinning, so not all websites can be accessed without further ado. Furthermore, there may be errors in the display of images, videos or other content.

Schema:




Proxy1 script:

import socket
import ssl
import threading

import subprocess
import os


IP = '0.0.0.0'
PORT = 3128

CERT_FILE = 'server.crt'
KEY_FILE = 'server.key'


PROXY2_IP = '192.168.1.212' 
PROXY2_PORT = 3128 

PROXY2_TIMEOUT = 60  # wait 60 sec.

BUFFER_SIZE = 8128

def handle_client(client_sock):
    try:

        request = client_sock.recv(BUFFER_SIZE)


        if request.startswith(b"CONNECT"):

            response = b"HTTP/1.1 200 Connection Established\r\n\r\n"
            client_sock.sendall(response)


            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
            context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
            secured_sock = context.wrap_socket(client_sock, server_side=True)

            request = secured_sock.recv(BUFFER_SIZE)
       

            print(request)


            proxy2_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            proxy2_sock.connect((PROXY2_IP, PROXY2_PORT))
            proxy2_sock.sendall(request)


            def forward_response_to_client():
                try:
                    while True:
                        data = proxy2_sock.recv(BUFFER_SIZE)
                        if not data:
                            break
                        secured_sock.sendall(data)
                except Exception as e:
                    print(f"[ERROR] {e}")


            threading.Thread(target=forward_response_to_client).start()

            proxy2_sock.settimeout(PROXY2_TIMEOUT)
        else:
            print("[ERROR] Unsupported request method!")
            client_sock.close()

    except socket.timeout:
        print("[ERROR] Timeout while waiting for a response from Proxy2.")


    except Exception as e:
        print(f"[ERROR] {e}")
        client_sock.close()



def main():

    domain = "example.com"
    ca_key = "myCA.key"
    ca_cert = "myCA.pem"


    server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    server_sock.bind((IP, PORT))
    server_sock.listen(5)
    print(f"[INFO] Listening on {IP}:{PORT}")

    while True:
        client_sock, client_address = server_sock.accept()
        print(f"[INFO] Accepted connection from {client_address[0]}:{client_address[1]}")
        client_handler = threading.Thread(target=handle_client, args=(client_sock,))
        client_handler.start()

if __name__ == "__main__":
    main()



Proxy2 script:

import socket
import threading
import ssl

BUFFER_SIZE = 8128 #4096
PORT = 3128

def handle_client(client_sock, client_addr):
    try:
        request = client_sock.recv(4096).decode('utf-8')
        print(f"Received request:\n{request}")

        # Add the X-Forwarded-For header
        headers_end = request.find("\r\n\r\n")
        if headers_end != -1:
            request = request[:headers_end] + f"\r\nX-Forwarded-For: {client_addr[0]}\r\n" + request[headers_end:]


        lines = request.split('\r\n')
        domain = None
        for line in lines:
            if line.startswith("Host:"):
                domain = line.split()[1]
                break

        if not domain:
            print("[ERROR] Host header not found.")
            return

        context = ssl.create_default_context()
        with context.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=domain) as server_sock:
            server_sock.connect((domain, 443))  # HTTPS-Port
            server_sock.sendall(request.encode())

            response = b""
            while True:
                chunk = server_sock.recv(BUFFER_SIZE)
                if not chunk:
                    break
                response += chunk

        print(f"Sending {len(response)} bytes back to Proxy1.")
        client_sock.sendall(response)

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        client_sock.close()

def main(proxy_port):
    proxy_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    proxy_sock.bind(('0.0.0.0', proxy_port))
    proxy_sock.listen(5)

    print(f"Proxy 2 started on port {proxy_port}.")

    while True:
        client_sock, client_addr = proxy_sock.accept()
        threading.Thread(target=handle_client, args=(client_sock, client_addr)).start()

if __name__ == '__main__':
    proxy_port = PORT
    main(proxy_port)





To make the server certificate:

openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodes



The certificate can also be produced differently, for initial testing purposes this should be sufficient...

My configuration:

Client: 192.168.0.34
Proxy1: 192.168.0.212

OPNsense: (IPS) 192.168.0.12
OPNsense: (IPS) 192.168.1.12

Proxy2: 192.168.1.212

Ok that with the gateway on in the net 192.168.1 I must look again more exactly, because in my constellation the traffic occurs then twice on the 192.168.1.12, this should be solved more elegantly

Ok an enterprise feature it is probably not yet, but I have the hope to find a better solution for it with the help of the community.

Even though SSL decryption has its drawbacks, I'm convinced that with the necessary caution it can add value.