Writing test websocket client

This commit is contained in:
hornet 2021-10-19 14:08:05 -04:00
parent 9bd87725d3
commit bdd2c0f91d
2 changed files with 77 additions and 0 deletions

34
src/client/cli.py Normal file
View File

@ -0,0 +1,34 @@
import websocket
import _thread
import time
import ssl
def on_message(ws, message):
print(f"<< {message}")
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print(f"### closed. reason: {close_msg} ###")
def on_open(ws):
def run(*args):
running = True
while running:
outbound = input(">")
if outbound == 'q':
running = False
else:
ws.send(outbound)
ws.close()
_thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://hornetfighter.com:9443",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

43
src/client/test-cli.py Normal file
View File

@ -0,0 +1,43 @@
import websocket
import _thread
import ssl
import json
def on_message(ws, message):
print(f"<< {message}")
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print(f"### closed. reason: {close_msg} ###")
def on_open(ws):
def run(*args):
running = True
uname = input('input username:')
msg = {
"user":uname,
"content":None
}
ws.send(json.dumps(msg))
while running:
outbound = input(">")
if outbound == 'q':
running = False
else:
msg = {
"user":uname,
"content":outbound
}
ws.send(json.dumps(msg))
ws.close()
_thread.start_new_thread(run, ())
if __name__ == "__main__":
ws = websocket.WebSocketApp("wss://hornetfighter.com:9443",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})