import socket import os from .ip_ctype import IP_ctype def udp_single_packet_sniff(host): if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host,0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # turn on promiscuous mode if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) print(sniffer.recvfrom(65565)) if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) def sniff(args): (host, event_callback) = args if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host,0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # turn on promiscuous mode if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) packets_sniffed = 0 try: while True: # read a packet raw_buffer = sniffer.recvfrom(65535)[0] # parse that packet ip_header = IP_ctype(raw_buffer[0:20]) try: event_callback(f'Protocol: {ip_header.protocol}\t{ip_header.src_address} -> {ip_header.dst_address}') packets_sniffed += 1 except AttributeError as e: continue except KeyboardInterrupt: if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) return f'Packets Sniffed: {packets_sniffed}' def main(): host = input('Host IP > ') udp_single_packet_sniff(host) sniff((host, print)) if __name__ == '__main__': main()