droidDepotPythonControl/droid.py
hornet 8e02ed1a8b
feature/button_input (#2)
* Added keyboard functionality for existing features

* Droid won't repeat most recent noise

* Starting button input

* bcm

* async

* no args

* no arglist

* asycnio

* lgpio

* asyncio

* wait

* async

* asycn

* remove wait

* added keyboard interrup

* params

* test

* got callbacks working

* added keyboard control update

* droid commands have movement

* i do not honestly know what is different

* added movement stub

* added movement support?

* string

* format string

* trying less space

* added global

* manual control

* select_speed

* stop

* droid running

* stupid multiple naming

* play sound fixed

* testing forward functionality

* callback

* move

* rotate head

* typo
2022-10-19 22:11:43 -04:00

197 lines
6.8 KiB
Python

import asyncio
from time import sleep
from bleak import BleakScanner, BleakClient, BleakError
class Motors:
LEFT=0
RIGHT=1
HEAD=2
# Directions
class Directions:
FORWARD = 0
BACKWARD = 8
ROTATE_LEFT = 0
ROTATE_RIGHT = 8
class Droid():
def __init__(self, profile):
print("Initializing")
self.disabledLeds = 0x00
self.profile = profile
# assumes theta in degrees and r = 0 to 100 %
# returns a tuple of percentages: (left_thrust, right_thrust)
def __throttle_angle_to_thrust__(r, theta):
theta = ((theta + 180) % 360) - 180 # normalize value to [-180, 180)
r = min(max(0, r), 100) # normalize value to [0, 100]
v_a = r * (45 - theta % 90) / 45 # falloff of main motor
v_b = min(100, 2 * r + v_a, 2 * r - v_a) # compensation of other motor
if theta < -90: return -v_b, -v_a
if theta < 0: return -v_a, v_b
if theta < 90: return v_b, v_a
return v_a, -v_b
async def connect(self):
timeout=0.0
print("Connecting")
self.droid = BleakClient(self.profile)
await self.droid.connect()
# while not self.droid.is_connected and timeout < 10:
# sleep (.1)
# timeout += .1
print ("Connected!")
connectCode = bytearray.fromhex("222001")
await self.droid.write_gatt_char(0x000d, connectCode, False)
await self.droid.write_gatt_char(0x000d, connectCode, False)
print("Locked")
light_blink = bytearray.fromhex("2C000449020001ff01ff0aff00")
await self.droid.write_gatt_char(0x000d, light_blink)
connect_sound = bytearray.fromhex("25000c421102")
await self.droid.write_gatt_char(0x000d, connect_sound)
sleep(3)
async def disconnect(self):
print ("Disconnecting")
try:
soundBank = bytearray.fromhex("27420f4444001f09")
await self.droid.write_gatt_char(0x000d, soundBank)
soundSelection = bytearray.fromhex("27420f4444001800")
await self.droid.write_gatt_char(0x000d, soundSelection)
sleep(3)
finally:
await self.droid.disconnect()
print("Disconnected")
async def led_disable_sound(self, leds):
ledDisableCommand = bytearray.fromhex(f"27420f4444004a{leds}")
await self.droid.write_gatt_char(0x000d, ledDisableCommand)
self.disabledLeds = self.disabledLeds|int(leds, 16)
print(self.disabledLeds)
async def led_enable_sound(self, leds):
ledEnableCommand = bytearray.fromhex(f"27420f4444004b{leds}")
await self.droid.write_gatt_char(0x000d, ledEnableCommand)
self.disabledLeds = self.disabledLeds-(int(leds, 16)&self.disabledLeds)
print(self.disabledLeds)
async def led_flash(self, leds, duration):
pass
async def led_off(self, leds):
ledOffCommand = bytearray.fromhex( f"27420f44440049{leds}" )
await self.droid.write_gatt_char(0x000d, ledOffCommand)
print(f"{self.disabledLeds:02x}")
print((f"{(~self.disabledLeds & 0x1F):02x}"))
await self.led_enable_sound(f"{(~self.disabledLeds & 0x1F):02x}")
async def led_on(self, leds):
ledOnCommand = bytearray.fromhex(f"27420f44440048{leds}")
await self.droid.write_gatt_char(0x000d, ledOnCommand)
async def move (self, degrees, duration):
thrust = self.__throttle_angle_to_thrust__(degrees)
async def move_motors(self, direction, motor, strength):
move_selection = bytearray.fromhex("29420546{}{}{}012C0000".format(direction, motor, strength))
await self.droid.write_gatt_char(0x000d, move_selection)
async def play_sound(self, sound_id=None, bank_id=None, cycle=False, volume=None):
if volume:
self.set_volume(volume)
if bank_id and (not hasattr(self, "soundbank") or self.soundbank != bank_id):
await self.set_soundbank(bank_id)
if sound_id:
soundSelection = bytearray.fromhex("27420f44440018{}".format(sound_id))
elif cycle:
soundSelection = bytearray.fromhex("26420f4344001c")
else:
soundSelection = bytearray.fromhex("27420f44440010{}".format(self.bank_id))
await self.droid.write_gatt_char(0x000d, soundSelection)
async def run_routine(self, routineId):
full_id = bytearray.fromhex("25000c42{}02".format(routineId))
await self.droid.write_gatt_char(0x000d, full_id)
async def set_soundbank(self, bank_id):
self.soundbank = bank_id
soundBank = bytearray.fromhex("27420f4444001f{}".format(bank_id))
await self.droid.write_gatt_char(0x000d, soundBank)
async def set_volume(self, volume):
volume_command = bytearray.fromhex("27420f4444000e{}".format(volume))
await self.droid.write_gatt_char(0x000d, volume_command)
def findDroid(candidate, data):
if candidate.name == "DROID":
return True
else:
return False
async def discoverDroid(retry=False):
myDroid = None
while retry and myDroid is None:
try:
myDroid = await BleakScanner.find_device_by_filter(findDroid)
if myDroid is None:
if not retry:
print("Droid discovery timed out.")
return
else:
print("Droid discovery timed out. Retrying...")
continue
except BleakError as err:
print("Droid discovery failed. Retrying...")
continue
print (f"Astromech successfully discovered: [ {myDroid} ]")
d = Droid(myDroid)
return d
async def main():
d = await discoverDroid(retry=True)
try:
await d.connect()
sleep (3)
# await arms.run_routine("05")
# sleep (5)
# await arms.set_soundbank("05")
# await arms.play_sound("00")
# sleep (5)
# for i in range(5):
# await arms.play_sound(cycle=True)
# sleep(5)
# await arms.play_sound("00", "00")
# sleep(8)
await d.led_disable_sound("01")
await d.play_sound("00", "00")
sleep(10)
await d.led_on("1f")
sleep(10)
await d.led_off("1f")
await d.play_sound("00", "00")
sleep(10)
except OSError as err:
print(f"Discovery failed due to operating system: {err}")
except BleakError as err:
print(f"Discovery failed due to Bleak: {err}")
finally:
await d.disconnect()
if __name__ == "__main__":
asyncio.run(main())