From 8e02ed1a8bf93ed8be7b3c28f6742cc1c228bb56 Mon Sep 17 00:00:00 2001 From: hornet <43388240+hornetfighter515@users.noreply.github.com> Date: Wed, 19 Oct 2022 22:11:43 -0400 Subject: [PATCH] feature/button_input (#2) * Added keyboard functionality for existing features * Droid won't repeat most recent noise * Starting button input * bcm * async * no args * no arglist * asycnio * lgpio * asyncio * wait * async * asycn * remove wait * added keyboard interrup * params * test * got callbacks working * added keyboard control update * droid commands have movement * i do not honestly know what is different * added movement stub * added movement support? * string * format string * trying less space * added global * manual control * select_speed * stop * droid running * stupid multiple naming * play sound fixed * testing forward functionality * callback * move * rotate head * typo --- button_input.py | 131 ++++++++++++++++ droid.py | 373 +++++++++++++++++++++++--------------------- droid_commands.py | 89 +++++++++++ keyboard_control.py | 88 +++++++++++ 4 files changed, 504 insertions(+), 177 deletions(-) create mode 100644 button_input.py create mode 100644 droid_commands.py create mode 100644 keyboard_control.py diff --git a/button_input.py b/button_input.py new file mode 100644 index 0000000..28a7a9d --- /dev/null +++ b/button_input.py @@ -0,0 +1,131 @@ +from droid import Directions +import droid_commands as d +import asyncio +from time import sleep + +import lgpio as GPIO + +NOISE = 16 + +LEFT = 26 +DOWN = 27 +UP = 25 +RIGHT = 19 + +ROT_LEFT = 6 +ROT_RIGHT = 5 + + +todo = [] + + +def play_sound(handle, gpio, edge, time): + print(f"Playing sound at {time}") + todo.append(d.play_sound()) + + +def move(handle, gpio, edge, time): + # first, kill all other movements + todo.append(d.move_stop()) + + # then, check if this was a falling edge + if edge == 0: + return + + if gpio == RIGHT: + print("Pressed right") + todo.append(d.move_droid(right=True)) + elif gpio == LEFT: + print("Pressed left") + todo.append(d.move_droid(left=True)) + elif gpio == UP: + print("Pressed forward") + todo.append(d.move_droid(forward=True)) + elif gpio == DOWN: + todo.append(d.move_droid(backward=True)) + print("Pressed backward") + + +def move_head(handle, gpio, edge, time): + todo.append(d.stop_rotate_head()) + + if edge == 0: + return + + if gpio == ROT_LEFT: + print("Rotating left") + todo.append(d.rotate_head(Directions.ROTATE_LEFT)) + elif gpio == ROT_RIGHT: + print("Rotating right") + todo.append(d.rotate_head(Directions.ROTATE_RIGHT)) + + + +async def droid_connect(pi): + try: + await d.start_droid() + while True: + if len(todo) > 0: + await todo[0] + todo.pop(0) + except KeyboardInterrupt: + print("Thanks for using") + finally: + GPIO.gpiochip_close(pi) + await d.stop_droid() + + +async def main(): + pi = GPIO.gpiochip_open(0) + if pi < 0: + return + + GPIO.gpio_claim_alert(pi, NOISE, GPIO.RISING_EDGE) + GPIO.gpio_set_debounce_micros(pi, NOISE, 200) + + # feet + GPIO.gpio_claim_alert(pi, UP, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, UP, 200) + + GPIO.gpio_claim_alert(pi, DOWN, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, DOWN, 200) + + GPIO.gpio_claim_alert(pi, LEFT, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, LEFT, 200) + + GPIO.gpio_claim_alert(pi, RIGHT, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, RIGHT, 200) + + # head + GPIO.gpio_claim_alert(pi, ROT_LEFT, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, ROT_LEFT, 200) + + GPIO.gpio_claim_alert(pi, ROT_RIGHT, GPIO.BOTH_EDGES) + GPIO.gpio_set_debounce_micros(pi, ROT_RIGHT, 200) + + + + cbs = [] + print("Configuring callbacks") + + # noise + cbs.append(GPIO.callback(pi, NOISE, func=play_sound)) + + # feet + cbs.append(GPIO.callback(pi, UP, func=move)) + cbs.append(GPIO.callback(pi, DOWN, func=move)) + cbs.append(GPIO.callback(pi, LEFT, func=move)) + cbs.append(GPIO.callback(pi, RIGHT, func=move)) + + # head + cbs.append(GPIO.callback(pi, ROT_LEFT, func=move_head)) + cbs.append(GPIO.callback(pi, ROT_RIGHT, func=move_head)) + + print("Callback configured") + await droid_connect(pi) + + + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/droid.py b/droid.py index 246e88e..ac950c6 100644 --- a/droid.py +++ b/droid.py @@ -1,177 +1,196 @@ -import asyncio -from time import sleep -from bleak import BleakScanner, BleakClient, BleakError - -class Droid(): - def __init__(self, profile): - print("Initializing") - self.disabledLeds = 0x00 - self.profile = profile - - # assumes theta in degrees and r = 0 to 100 % - # returns a tuple of percentages: (left_thrust, right_thrust) - def __throttle_angle_to_thrust__(r, theta): - theta = ((theta + 180) % 360) - 180 # normalize value to [-180, 180) - r = min(max(0, r), 100) # normalize value to [0, 100] - v_a = r * (45 - theta % 90) / 45 # falloff of main motor - v_b = min(100, 2 * r + v_a, 2 * r - v_a) # compensation of other motor - if theta < -90: return -v_b, -v_a - if theta < 0: return -v_a, v_b - if theta < 90: return v_b, v_a - return v_a, -v_b - - async def connect(self): - timeout=0.0 - print("Connecting") - self.droid = BleakClient(self.profile) - await self.droid.connect() - while not self.droid.is_connected and timeout < 10: - sleep (.1) - timeout += .1 - print ("Connected!") - connectCode = bytearray.fromhex("222001") - await self.droid.write_gatt_char(0x000d, connectCode, False) - await self.droid.write_gatt_char(0x000d, connectCode, False) - print("Locked") - light_blink = bytearray.fromhex("2C000449020001ff01ff0aff00") - await self.droid.write_gatt_char(0x000d, light_blink) - connect_sound = bytearray.fromhex("25000c421102") - await self.droid.write_gatt_char(0x000d, connect_sound) - sleep(3) - - async def disconnect(self): - print ("Disconnecting") - try: - soundBank = bytearray.fromhex("27420f4444001f09") - await self.droid.write_gatt_char(0x000d, soundBank) - soundSelection = bytearray.fromhex("27420f4444001800") - await self.droid.write_gatt_char(0x000d, soundSelection) - sleep(3) - finally: - await self.droid.disconnect() - print("Disconnected") - - async def led_disable_sound(self, leds): - ledDisableCommand = bytearray.fromhex(f"27420f4444004a{leds}") - await self.droid.write_gatt_char(0x000d, ledDisableCommand) - self.disabledLeds = self.disabledLeds|int(leds, 16) - print(self.disabledLeds) - - async def led_enable_sound(self, leds): - ledEnableCommand = bytearray.fromhex(f"27420f4444004b{leds}") - await self.droid.write_gatt_char(0x000d, ledEnableCommand) - self.disabledLeds = self.disabledLeds-(int(leds, 16)&self.disabledLeds) - print(self.disabledLeds) - - - async def led_flash(self, leds, duration): - pass - - async def led_off(self, leds): - ledOffCommand = bytearray.fromhex( f"27420f44440049{leds}" ) - await self.droid.write_gatt_char(0x000d, ledOffCommand) - print(f"{self.disabledLeds:02x}") - print((f"{(~self.disabledLeds & 0x1F):02x}")) - await self.led_enable_sound(f"{(~self.disabledLeds & 0x1F):02x}") - - - async def led_on(self, leds): - ledOnCommand = bytearray.fromhex(f"27420f44440048{leds}") - await self.droid.write_gatt_char(0x000d, ledOnCommand) - - def move (self, degrees, duration): - thrust = self.__throttle_angle_to_thrust__(degrees) - - async def play_sound(self, sound_id=None, bank_id=None, cycle=False, volume=None): - if volume: - self.set_volume(volume) - if bank_id and (not hasattr(self, "soundbank") or self.soundbank != bank_id): - await self.set_soundbank(bank_id) - if sound_id: - soundSelection = bytearray.fromhex("27420f44440018{}".format(sound_id)) - elif cycle: - soundSelection = bytearray.fromhex("26420f4344001c") - else: - soundSelection = bytearray.fromhex("27420f44440010{}".format(self.bank_id)) - await self.droid.write_gatt_char(0x000d, soundSelection) - - async def run_routine(self, routineId): - full_id = bytearray.fromhex("25000c42{}02".format(routineId)) - await self.droid.write_gatt_char(0x000d, full_id) - - async def set_soundbank(self, bank_id): - self.soundbank = bank_id - soundBank = bytearray.fromhex("27420f4444001f{}".format(bank_id)) - await self.droid.write_gatt_char(0x000d, soundBank) - - async def set_volume(self, volume): - volume_command = bytearray.fromhex("27420f4444000e{}".format(volume)) - await self.droid.write_gatt_char(0x000d, volume_command) - -def findDroid(candidate, data): - if candidate.name == "DROID": - return True - else: - return False - -async def discoverDroid(retry=False): - myDroid = None - - while retry and myDroid is None: - try: - myDroid = await BleakScanner.find_device_by_filter(findDroid) - if myDroid is None: - if not retry: - print("Droid discovery timed out.") - return - else: - print("Droid discovery timed out. Retrying...") - continue - except BleakError as err: - print("Droid discovery failed. Retrying...") - continue - - - print (f"Astromech successfully discovered: [ {myDroid} ]") - - d = Droid(myDroid) - return d - - - -async def main(): - - d = await discoverDroid(retry=True) - - try: - await d.connect() - sleep (3) - # await arms.run_routine("05") - # sleep (5) - # await arms.set_soundbank("05") - # await arms.play_sound("00") - # sleep (5) - # for i in range(5): - # await arms.play_sound(cycle=True) - # sleep(5) - # await arms.play_sound("00", "00") - # sleep(8) - await d.led_disable_sound("01") - await d.play_sound("00", "00") - sleep(10) - await d.led_on("1f") - sleep(10) - await d.led_off("1f") - await d.play_sound("00", "00") - sleep(10) - - except OSError as err: - print(f"Discovery failed due to operating system: {err}") - except BleakError as err: - print(f"Discovery failed due to Bleak: {err}") - - finally: - await d.disconnect() - -if __name__ == "__main__": - asyncio.run(main()) +import asyncio +from time import sleep +from bleak import BleakScanner, BleakClient, BleakError + +class Motors: + LEFT=0 + RIGHT=1 + HEAD=2 + +# Directions +class Directions: + FORWARD = 0 + BACKWARD = 8 + ROTATE_LEFT = 0 + ROTATE_RIGHT = 8 + +class Droid(): + + def __init__(self, profile): + print("Initializing") + self.disabledLeds = 0x00 + self.profile = profile + + # assumes theta in degrees and r = 0 to 100 % + # returns a tuple of percentages: (left_thrust, right_thrust) + def __throttle_angle_to_thrust__(r, theta): + theta = ((theta + 180) % 360) - 180 # normalize value to [-180, 180) + r = min(max(0, r), 100) # normalize value to [0, 100] + v_a = r * (45 - theta % 90) / 45 # falloff of main motor + v_b = min(100, 2 * r + v_a, 2 * r - v_a) # compensation of other motor + if theta < -90: return -v_b, -v_a + if theta < 0: return -v_a, v_b + if theta < 90: return v_b, v_a + return v_a, -v_b + + async def connect(self): + timeout=0.0 + print("Connecting") + self.droid = BleakClient(self.profile) + await self.droid.connect() + # while not self.droid.is_connected and timeout < 10: + # sleep (.1) + # timeout += .1 + print ("Connected!") + connectCode = bytearray.fromhex("222001") + await self.droid.write_gatt_char(0x000d, connectCode, False) + await self.droid.write_gatt_char(0x000d, connectCode, False) + print("Locked") + light_blink = bytearray.fromhex("2C000449020001ff01ff0aff00") + await self.droid.write_gatt_char(0x000d, light_blink) + connect_sound = bytearray.fromhex("25000c421102") + await self.droid.write_gatt_char(0x000d, connect_sound) + sleep(3) + + async def disconnect(self): + print ("Disconnecting") + try: + soundBank = bytearray.fromhex("27420f4444001f09") + await self.droid.write_gatt_char(0x000d, soundBank) + soundSelection = bytearray.fromhex("27420f4444001800") + await self.droid.write_gatt_char(0x000d, soundSelection) + sleep(3) + finally: + await self.droid.disconnect() + print("Disconnected") + + async def led_disable_sound(self, leds): + ledDisableCommand = bytearray.fromhex(f"27420f4444004a{leds}") + await self.droid.write_gatt_char(0x000d, ledDisableCommand) + self.disabledLeds = self.disabledLeds|int(leds, 16) + print(self.disabledLeds) + + async def led_enable_sound(self, leds): + ledEnableCommand = bytearray.fromhex(f"27420f4444004b{leds}") + await self.droid.write_gatt_char(0x000d, ledEnableCommand) + self.disabledLeds = self.disabledLeds-(int(leds, 16)&self.disabledLeds) + print(self.disabledLeds) + + + async def led_flash(self, leds, duration): + pass + + async def led_off(self, leds): + ledOffCommand = bytearray.fromhex( f"27420f44440049{leds}" ) + await self.droid.write_gatt_char(0x000d, ledOffCommand) + print(f"{self.disabledLeds:02x}") + print((f"{(~self.disabledLeds & 0x1F):02x}")) + await self.led_enable_sound(f"{(~self.disabledLeds & 0x1F):02x}") + + + async def led_on(self, leds): + ledOnCommand = bytearray.fromhex(f"27420f44440048{leds}") + await self.droid.write_gatt_char(0x000d, ledOnCommand) + + async def move (self, degrees, duration): + thrust = self.__throttle_angle_to_thrust__(degrees) + + + async def move_motors(self, direction, motor, strength): + move_selection = bytearray.fromhex("29420546{}{}{}012C0000".format(direction, motor, strength)) + await self.droid.write_gatt_char(0x000d, move_selection) + + + async def play_sound(self, sound_id=None, bank_id=None, cycle=False, volume=None): + if volume: + self.set_volume(volume) + if bank_id and (not hasattr(self, "soundbank") or self.soundbank != bank_id): + await self.set_soundbank(bank_id) + if sound_id: + soundSelection = bytearray.fromhex("27420f44440018{}".format(sound_id)) + elif cycle: + soundSelection = bytearray.fromhex("26420f4344001c") + else: + soundSelection = bytearray.fromhex("27420f44440010{}".format(self.bank_id)) + await self.droid.write_gatt_char(0x000d, soundSelection) + + async def run_routine(self, routineId): + full_id = bytearray.fromhex("25000c42{}02".format(routineId)) + await self.droid.write_gatt_char(0x000d, full_id) + + async def set_soundbank(self, bank_id): + self.soundbank = bank_id + soundBank = bytearray.fromhex("27420f4444001f{}".format(bank_id)) + await self.droid.write_gatt_char(0x000d, soundBank) + + async def set_volume(self, volume): + volume_command = bytearray.fromhex("27420f4444000e{}".format(volume)) + await self.droid.write_gatt_char(0x000d, volume_command) + +def findDroid(candidate, data): + if candidate.name == "DROID": + return True + else: + return False + +async def discoverDroid(retry=False): + myDroid = None + + while retry and myDroid is None: + try: + myDroid = await BleakScanner.find_device_by_filter(findDroid) + if myDroid is None: + if not retry: + print("Droid discovery timed out.") + return + else: + print("Droid discovery timed out. Retrying...") + continue + except BleakError as err: + print("Droid discovery failed. Retrying...") + continue + + + print (f"Astromech successfully discovered: [ {myDroid} ]") + + d = Droid(myDroid) + return d + + + +async def main(): + + d = await discoverDroid(retry=True) + + try: + await d.connect() + sleep (3) + # await arms.run_routine("05") + # sleep (5) + # await arms.set_soundbank("05") + # await arms.play_sound("00") + # sleep (5) + # for i in range(5): + # await arms.play_sound(cycle=True) + # sleep(5) + # await arms.play_sound("00", "00") + # sleep(8) + await d.led_disable_sound("01") + await d.play_sound("00", "00") + sleep(10) + await d.led_on("1f") + sleep(10) + await d.led_off("1f") + await d.play_sound("00", "00") + sleep(10) + + except OSError as err: + print(f"Discovery failed due to operating system: {err}") + except BleakError as err: + print(f"Discovery failed due to Bleak: {err}") + + finally: + await d.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/droid_commands.py b/droid_commands.py new file mode 100644 index 0000000..57e92c5 --- /dev/null +++ b/droid_commands.py @@ -0,0 +1,89 @@ +import droid +import asyncio +from time import sleep +from random import randrange + +d = "" + + +lastSound = -1 +async def play_sound(): + global lastSound + global d + sound = lastSound + while sound == lastSound: + sound = randrange(0,5) + lastSound = sound + + while d is None: + sleep(0.1) + + await d.play_sound(f"0{sound}","00") + + +async def play_specific_sound(bank, sound): + global d + await d.play_sound( sound_id = sound, bank_id = bank) + + + +async def move_droid(forward=False, backward=False, left=False, right=False): + + l = droid.Directions.FORWARD + r = droid.Directions.FORWARD + global d + + + if forward: + l = droid.Directions.FORWARD + r = droid.Directions.FORWARD + if backward: + l = droid.Directions.BACKWARD + r = droid.Directions.BACKWARD + if left: + # spin left + l = droid.Directions.BACKWARD + r = droid.Directions.FORWARD + if right: + # spin right + l = droid.Directions.FORWARD + r = droid.Directions.BACKWARD + + await d.move_motors(l, droid.Motors.LEFT, "FF") + await d.move_motors(r, droid.Motors.RIGHT,"FF") + + +async def move_manually(direction, motor, strength): + global d + await d.move_motors(direction, motor, strength) + + +async def move_stop(): + global d + await move_manually(droid.Directions.FORWARD, droid.Motors.LEFT, "00") + await move_manually(droid.Directions.FORWARD, droid.Motors.RIGHT, "00") + + +async def rotate_head(direction): + global d + await d.move_motors(direction, droid.Motors.HEAD, "FF") + # eventually should we stop this call so we don't waste a bunch of battery? + +async def stop_rotate_head(): + global d + await d.move_motors(droid.Directions.ROTATE_LEFT, droid.Motors.HEAD, "00") + + +async def start_droid(): + """ + This function should be in a try loop + """ + global d + d = await droid.discoverDroid(retry=True) + await d.connect() + + +async def stop_droid(): + global d + await d.disconnect() + \ No newline at end of file diff --git a/keyboard_control.py b/keyboard_control.py new file mode 100644 index 0000000..38cafb9 --- /dev/null +++ b/keyboard_control.py @@ -0,0 +1,88 @@ +import asyncio +from re import L +from time import sleep +from random import randrange +import droid_commands as d +from droid import Directions + + + +runningDroid = True + +async def select_noise(d): + bank = input("\tBank> ") + sound = input("\tSound> ") + await d.play_specific_sound(bank, sound) + + +async def play_sound(d): + await d.play_sound() + +async def select_speed(d): + direction = input("\tDirection> ") + motor = input("\tMotor> ") + speed = input("\tSpeed> ") + await d.move_manually(direction, motor, speed) + +async def forward(d): + await d.move_droid(Directions.FORWARD) + +async def backward(d): + await d.move_droid(Directions.BACKWARD) + +async def left(d): + await d.move_droid(Directions.LEFT) + +async def right(d): + await d.move_droid(Directions.RIGHT) + +async def move_stop(d): + await d.move_stop() + + + +async def quit(d): + global runningDroid + runningDroid = False + +async def main(): + # first, get droid + await d.start_droid() + + commands = { + "w": forward, + "a": left, + "s": backward, + "d": right, + "x": move_stop, + "z": select_speed, + #"e": rotate counter-clockwise, + #"r": rotate clockwise, + #"f": special effect + "n": play_sound, + "m": select_noise, + "q": quit + } + try: + global runningDroid + while runningDroid: + # next, await input + command = input("Please input a command > ") + # next, parse that command + c = command.lower()[0:1] + if c in commands.keys(): + await commands[c](d) + else: + print("Command does not exist.") + + + sleep(0.2) + + except Exception as e: + print(e) + + finally: + await d.stop_droid() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file