216 lines
6.3 KiB
Python
216 lines
6.3 KiB
Python
|
"""
|
||
|
Creates hub for distributed IoT
|
||
|
|
||
|
Websocket Server docs: https://github.com/Pithikos/python-websocket-server
|
||
|
|
||
|
@author hornetfighter515
|
||
|
@author Lachezar Todorov
|
||
|
"""
|
||
|
|
||
|
import threading
|
||
|
from websocket_server import WebsocketServer
|
||
|
import time
|
||
|
import json
|
||
|
from random import random
|
||
|
|
||
|
iot_devs = {}
|
||
|
e_clients = {}
|
||
|
i_client = None
|
||
|
name=''
|
||
|
|
||
|
ext_srv = None
|
||
|
int_srv = None
|
||
|
iot_srv = None
|
||
|
|
||
|
class Client:
|
||
|
def __init__(self, ws):
|
||
|
self.new = True
|
||
|
self.ws = ws
|
||
|
self.n = None
|
||
|
self.e = None
|
||
|
|
||
|
|
||
|
def _log_prefix(t, client=None):
|
||
|
if client is None:
|
||
|
return str(time.time()) + ' [' + t + ':srv] '
|
||
|
return str(time.time()) + ' [' + t + ':' + str(client['id']) + '] '
|
||
|
|
||
|
# #
|
||
|
#################### INCOMING MESSAGES ########################################
|
||
|
# #
|
||
|
|
||
|
def _load_msg(message):
|
||
|
try:
|
||
|
msg = json.loads(message)
|
||
|
except:
|
||
|
msg = message
|
||
|
return msg
|
||
|
|
||
|
|
||
|
def _pub_key(cli, msg):
|
||
|
try:
|
||
|
cli.n = msg['pub_key']['n']
|
||
|
cli.e = msg['pub_key']['e']
|
||
|
return True
|
||
|
except:
|
||
|
print('\tMessage had no attribute pub_key. Failed.')
|
||
|
return False
|
||
|
|
||
|
|
||
|
def _cli_pubs(coll):
|
||
|
res = {}
|
||
|
if len(coll) == 0:
|
||
|
return res
|
||
|
for c, cli in coll.items():
|
||
|
res[c] = {
|
||
|
"n":cli.n,
|
||
|
"e":cli.e
|
||
|
}
|
||
|
return res
|
||
|
|
||
|
|
||
|
def ext_msg(client, server, message):
|
||
|
# external coming in to network
|
||
|
print(_log_prefix('ext', client), 'sent message.')
|
||
|
cli = e_clients[client['id']]
|
||
|
msg = _load_msg(message)
|
||
|
if cli.new:
|
||
|
if _pub_key(cli, msg):
|
||
|
# give IoTs or cli
|
||
|
if len(iot_devs) > 0:
|
||
|
print('\tThere are IoTs to send to.')
|
||
|
else:
|
||
|
print('\tCommunicate directly with client.')
|
||
|
cli.new = False
|
||
|
int_srv.send_message(i_client.ws, json.dumps({
|
||
|
"type":"pub_key",
|
||
|
"available_cli":{
|
||
|
"n":cli.n,
|
||
|
"e":cli.e
|
||
|
}
|
||
|
}))
|
||
|
ext_srv.send_message(client, json.dumps({
|
||
|
"type":"pub_key",
|
||
|
"name":name,
|
||
|
"available_cli":{
|
||
|
"n":i_client.n,
|
||
|
"e":i_client.e
|
||
|
},
|
||
|
"available_iots": _cli_pubs(iot_devs)
|
||
|
}))
|
||
|
else:
|
||
|
print(f'\tMessage {msg["id"]} goes to {msg["to"]}')
|
||
|
if msg['to'] == 0:
|
||
|
print('\tTo internal client')
|
||
|
int_srv.send_message(i_client.ws, message)
|
||
|
else:
|
||
|
iot_srv.send_message(iot_devs[msg['to']].ws, message)
|
||
|
print('\tTo IoT device')
|
||
|
|
||
|
|
||
|
def int_msg(client, server, message):
|
||
|
# internal leaving network
|
||
|
print(_log_prefix('int', client), 'Sent message.' )
|
||
|
msg = _load_msg(message)
|
||
|
if msg['type'] == 'pub_key':
|
||
|
if i_client.new:
|
||
|
if _pub_key(i_client, msg):
|
||
|
print(f'\tinternal client successfully authed.')
|
||
|
i_client.new = False
|
||
|
return
|
||
|
elif msg['type'] == 'message':
|
||
|
global e_clients
|
||
|
to = msg['to'] + 1
|
||
|
ext_srv.send_message(e_clients[to].ws, message)
|
||
|
print('\tMessage sent to external client')
|
||
|
|
||
|
|
||
|
def iot_msg(client, server, message):
|
||
|
# Partially decrypted message coming back from IoT devices
|
||
|
print(_log_prefix('iot', client), 'sent message.')
|
||
|
iot = iot_devs[client['id']]
|
||
|
msg = _load_msg(message)
|
||
|
if iot.new:
|
||
|
if _pub_key(iot, msg):
|
||
|
print(f'\tIoT {client["id"]} successfully authed.')
|
||
|
iot.new = False
|
||
|
else:
|
||
|
if i_client is not None:
|
||
|
int_srv.send_message(i_client.ws, message)
|
||
|
print('\tInternal client safe to send to.')
|
||
|
else:
|
||
|
print('\tInternal client disconnected.')
|
||
|
|
||
|
# #
|
||
|
######################### NEW CONNECTIONS #####################################
|
||
|
# #
|
||
|
|
||
|
def ext_new(client, server):
|
||
|
global e_clients
|
||
|
if client['id'] not in e_clients:
|
||
|
e_clients[client['id']] = Client(client)
|
||
|
print(_log_prefix('ext', client) + 'Client joined.')
|
||
|
# give list of iots and their pubkeys
|
||
|
|
||
|
def int_new(client, server):
|
||
|
print(_log_prefix('int', client) + 'Client joined.')
|
||
|
global i_client
|
||
|
if i_client is None:
|
||
|
i_client = Client(client)
|
||
|
print('\tInternal client set.')
|
||
|
|
||
|
def iot_new(client, server):
|
||
|
global iot_devs
|
||
|
if client['id'] not in iot_devs:
|
||
|
iot_devs[client['id']] = Client(client)
|
||
|
print(_log_prefix('iot', client) + 'Device joined.')
|
||
|
|
||
|
|
||
|
def start_server(conf, new, msg, prefix):
|
||
|
ws = WebsocketServer(host=conf['url'], port=conf['port'])
|
||
|
ws.set_fn_new_client(new)
|
||
|
ws.set_fn_message_received(msg)
|
||
|
wst = threading.Thread(target=ws.run_forever)
|
||
|
wst.daemon = True
|
||
|
wst.start()
|
||
|
print(_log_prefix(prefix), 'Running...')
|
||
|
return ws
|
||
|
|
||
|
def start_servers(ext_conf, int_conf, iot_conf):
|
||
|
"""
|
||
|
Starts a server for each type of device.
|
||
|
Why create a different server for each client type? Separation of concerns
|
||
|
is easier this way. Messages don't need to be sorted as heavily by which
|
||
|
client they came from. Internal ports can more easily be firewalled, or not
|
||
|
exposed to the outside internet at all.
|
||
|
@param host: the host it is being run on
|
||
|
@param port_e: the externally exposed port for clients outside the network
|
||
|
@param port_i: the internal port for clients to connect to
|
||
|
@param port_iot: the port for IoT devices to connect to
|
||
|
"""
|
||
|
# websocket server exposed, or external, for external clients
|
||
|
global ext_srv
|
||
|
ext_srv = start_server(ext_conf, ext_new, ext_msg, 'ext')
|
||
|
|
||
|
# websocket server internal
|
||
|
global int_srv
|
||
|
int_srv = start_server(int_conf, int_new, int_msg, 'int')
|
||
|
|
||
|
# websocket server for IoT devices
|
||
|
global iot_srv
|
||
|
iot_srv = start_server(iot_conf, iot_new, iot_msg, 'iot')
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
import yaml
|
||
|
with open('src/config.yml', 'r') as f:
|
||
|
conf = yaml.safe_load(f)
|
||
|
ext_conf = conf['ws']['ext']
|
||
|
int_conf = conf['ws']['int']
|
||
|
iot_conf = conf['ws']['iot']
|
||
|
start_servers(ext_conf, int_conf, iot_conf)
|
||
|
while True:
|
||
|
time.sleep(1)
|
||
|
|
||
|
|