dist-iot-net/src/hub.py
2021-12-02 10:01:08 -05:00

234 lines
6.8 KiB
Python

"""
Creates hub for distributed IoT
Websocket Server docs: https://github.com/Pithikos/python-websocket-server
@author hornetfighter515
@author Lachezar Todorov
"""
import threading
from websocket_server import WebsocketServer
import time
import json
from random import random
iot_devs = {}
e_clients = {}
i_client = None
name=''
ext_srv = None
int_srv = None
iot_srv = None
class Client:
def __init__(self, ws):
self.new = True
self.ws = ws
self.n = None
self.e = None
def _log_prefix(t, client=None):
if client is None:
return str(time.time()) + ' [' + t + ':srv] '
return str(time.time()) + ' [' + t + ':' + str(client['id']) + '] '
# #
#################### INCOMING MESSAGES ########################################
# #
def _load_msg(message):
try:
msg = json.loads(message)
except:
msg = message
return msg
def _pub_key(cli, msg):
try:
cli.n = msg['pub_key']['n']
cli.e = msg['pub_key']['e']
return True
except:
print('\tMessage had no attribute pub_key. Failed.')
return False
timers = {}
def timer_start(msg):
# the ID is always between 0 and 1
now = time.time()
name = msg['to'] + msg['id']
timers[name] = now
def timer_stop(msg):
now = time.time()
name = msg['from'] + msg['id']
if name in timers:
with open('time_differences', 'a') as f:
f.write(f'{msg["from"]},{now - timers[name]}')
def _cli_pubs(coll):
res = {}
if len(coll) == 0:
return res
for c, cli in coll.items():
res[c] = {
"n":cli.n,
"e":cli.e
}
return res
def ext_msg(client, server, message):
# external coming in to network
print(_log_prefix('ext', client), 'sent message.')
cli = e_clients[client['id']]
msg = _load_msg(message)
if cli.new:
if _pub_key(cli, msg):
# give IoTs or cli
if len(iot_devs) > 0:
print('\tThere are IoTs to send to.')
else:
print('\tCommunicate directly with client.')
cli.new = False
int_srv.send_message(i_client.ws, json.dumps({
"type":"pub_key",
"available_cli":{
"n":cli.n,
"e":cli.e
}
}))
ext_srv.send_message(client, json.dumps({
"type":"pub_key",
"name":name,
"available_cli":{
"n":i_client.n,
"e":i_client.e
},
"available_iots": _cli_pubs(iot_devs)
}))
else:
print(f'\tMessage {msg["id"]} goes to {msg["to"]}')
if msg['to'] == 0:
print('\tTo internal client')
int_srv.send_message(i_client.ws, message)
else:
timer_start(msg)
iot_srv.send_message(iot_devs[msg['to']].ws, message)
print('\tTo IoT device')
def int_msg(client, server, message):
# internal leaving network
print(_log_prefix('int', client), 'Sent message.' )
msg = _load_msg(message)
if msg['type'] == 'pub_key':
if i_client.new:
if _pub_key(i_client, msg):
print(f'\tinternal client successfully authed.')
i_client.new = False
return
elif msg['type'] == 'message':
global e_clients
to = msg['to'] + 1
ext_srv.send_message(e_clients[to].ws, message)
print('\tMessage sent to external client')
def iot_msg(client, server, message):
# Partially decrypted message coming back from IoT devices
print(_log_prefix('iot', client), 'sent message.')
iot = iot_devs[client['id']]
msg = _load_msg(message)
if iot.new:
if _pub_key(iot, msg):
print(f'\tIoT {client["id"]} successfully authed.')
iot.new = False
else:
if i_client is not None:
timer_stop(msg)
int_srv.send_message(i_client.ws, message)
print('\tInternal client safe to send to.')
else:
print('\tInternal client disconnected.')
# #
######################### NEW CONNECTIONS #####################################
# #
def ext_new(client, server):
global e_clients
if client['id'] not in e_clients:
e_clients[client['id']] = Client(client)
print(_log_prefix('ext', client) + 'Client joined.')
# give list of iots and their pubkeys
def int_new(client, server):
print(_log_prefix('int', client) + 'Client joined.')
global i_client
if i_client is None:
i_client = Client(client)
print('\tInternal client set.')
def iot_new(client, server):
global iot_devs
if client['id'] not in iot_devs:
iot_devs[client['id']] = Client(client)
print(_log_prefix('iot', client) + 'Device joined.')
def start_server(conf, new, msg, prefix):
ws = WebsocketServer(host=conf['url'], port=conf['port'])
ws.set_fn_new_client(new)
ws.set_fn_message_received(msg)
wst = threading.Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
print(_log_prefix(prefix), 'Running...')
return ws
def start_servers(ext_conf, int_conf, iot_conf):
"""
Starts a server for each type of device.
Why create a different server for each client type? Separation of concerns
is easier this way. Messages don't need to be sorted as heavily by which
client they came from. Internal ports can more easily be firewalled, or not
exposed to the outside internet at all.
@param host: the host it is being run on
@param port_e: the externally exposed port for clients outside the network
@param port_i: the internal port for clients to connect to
@param port_iot: the port for IoT devices to connect to
"""
# websocket server exposed, or external, for external clients
global ext_srv
ext_srv = start_server(ext_conf, ext_new, ext_msg, 'ext')
# websocket server internal
global int_srv
int_srv = start_server(int_conf, int_new, int_msg, 'int')
# websocket server for IoT devices
global iot_srv
iot_srv = start_server(iot_conf, iot_new, iot_msg, 'iot')
if __name__ == "__main__":
import yaml
with open('src/config.yml', 'r') as f:
conf = yaml.safe_load(f)
ext_conf = conf['ws']['ext']
int_conf = conf['ws']['int']
iot_conf = conf['ws']['iot']
start_servers(ext_conf, int_conf, iot_conf)
while True:
time.sleep(1)