Hello,
I've been dabbling a bit with Python and have developed a proxy environment with which traffic is first decrypted, inspected by an IPS, and then re-encrypted.
The whole scenario works by means of an SSL terminating proxy, which is connected upstream of the IPS, the downstream proxy2 independently establishes an HTTPS/SSL connection for example to a web server and transmits the data again unencrypted to proxy1.
With this configuration the data stream can be examined, I succeeded to detect Eicar which should be downloaded via HTTPS and to prevent the download automatically, by Suricata IPS.
This solution is still in alpha stage and there are still problems especially for SSL pinning, so not all websites can be accessed without further ado. Furthermore, there may be errors in the display of images, videos or other content.
Schema:
(//)
Proxy1 script:
import socket
import ssl
import threading
import subprocess
import os
IP = '0.0.0.0'
PORT = 3128
CERT_FILE = 'server.crt'
KEY_FILE = 'server.key'
PROXY2_IP = '192.168.1.212'
PROXY2_PORT = 3128
PROXY2_TIMEOUT = 60 # wait 60 sec.
BUFFER_SIZE = 8128
def handle_client(client_sock):
try:
request = client_sock.recv(BUFFER_SIZE)
if request.startswith(b"CONNECT"):
response = b"HTTP/1.1 200 Connection Established\r\n\r\n"
client_sock.sendall(response)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
secured_sock = context.wrap_socket(client_sock, server_side=True)
request = secured_sock.recv(BUFFER_SIZE)
print(request)
proxy2_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy2_sock.connect((PROXY2_IP, PROXY2_PORT))
proxy2_sock.sendall(request)
def forward_response_to_client():
try:
while True:
data = proxy2_sock.recv(BUFFER_SIZE)
if not data:
break
secured_sock.sendall(data)
except Exception as e:
print(f"[ERROR] {e}")
threading.Thread(target=forward_response_to_client).start()
proxy2_sock.settimeout(PROXY2_TIMEOUT)
else:
print("[ERROR] Unsupported request method!")
client_sock.close()
except socket.timeout:
print("[ERROR] Timeout while waiting for a response from Proxy2.")
except Exception as e:
print(f"[ERROR] {e}")
client_sock.close()
def main():
domain = "example.com"
ca_key = "myCA.key"
ca_cert = "myCA.pem"
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((IP, PORT))
server_sock.listen(5)
print(f"[INFO] Listening on {IP}:{PORT}")
while True:
client_sock, client_address = server_sock.accept()
print(f"[INFO] Accepted connection from {client_address[0]}:{client_address[1]}")
client_handler = threading.Thread(target=handle_client, args=(client_sock,))
client_handler.start()
if __name__ == "__main__":
main()
Proxy2 script:
import socket
import threading
import ssl
BUFFER_SIZE = 8128 #4096
PORT = 3128
def handle_client(client_sock, client_addr):
try:
request = client_sock.recv(4096).decode('utf-8')
print(f"Received request:\n{request}")
# Add the X-Forwarded-For header
headers_end = request.find("\r\n\r\n")
if headers_end != -1:
request = request[:headers_end] + f"\r\nX-Forwarded-For: {client_addr[0]}\r\n" + request[headers_end:]
lines = request.split('\r\n')
domain = None
for line in lines:
if line.startswith("Host:"):
domain = line.split()[1]
break
if not domain:
print("[ERROR] Host header not found.")
return
context = ssl.create_default_context()
with context.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=domain) as server_sock:
server_sock.connect((domain, 443)) # HTTPS-Port
server_sock.sendall(request.encode())
response = b""
while True:
chunk = server_sock.recv(BUFFER_SIZE)
if not chunk:
break
response += chunk
print(f"Sending {len(response)} bytes back to Proxy1.")
client_sock.sendall(response)
except Exception as e:
print(f"[ERROR] {e}")
finally:
client_sock.close()
def main(proxy_port):
proxy_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
proxy_sock.bind(('0.0.0.0', proxy_port))
proxy_sock.listen(5)
print(f"Proxy 2 started on port {proxy_port}.")
while True:
client_sock, client_addr = proxy_sock.accept()
threading.Thread(target=handle_client, args=(client_sock, client_addr)).start()
if __name__ == '__main__':
proxy_port = PORT
main(proxy_port)
To make the server certificate:
openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodes
The certificate can also be produced differently, for initial testing purposes this should be sufficient...
My configuration:
Client: 192.168.0.34
Proxy1: 192.168.0.212
OPNsense: (IPS) 192.168.0.12
OPNsense: (IPS) 192.168.1.12
Proxy2: 192.168.1.212
Ok that with the gateway on in the net 192.168.1 I must look again more exactly, because in my constellation the traffic occurs then twice on the 192.168.1.12, this should be solved more elegantly
Ok an enterprise feature it is probably not yet, but I have the hope to find a better solution for it with the help of the community.
Even though SSL decryption has its drawbacks, I'm convinced that with the necessary caution it can add value.