Browse Source

Formatting and type sig cleanup

Kirk Trombley 1 year ago
parent
commit
dcdc655c19

+ 13 - 7
commands/commands/hangguy.py

@@ -1,4 +1,3 @@
-from typing import Optional
 from datetime import datetime
 from base64 import b64decode
 from uuid import uuid4
@@ -123,8 +122,8 @@ GUY_STAGES = [
 
 @initialize_data
 class HangGuy:
-    puzzle: Optional[str] = None
-    game_state: Optional[str] = None
+    puzzle: str | None = None
+    game_state: str | None = None
     bad_guesses: str = ""
     guy_state: int = 0
     guy_lifetime: int = 0
@@ -192,7 +191,9 @@ async def hangguy(
     alert_channel: Config("hangguy.alert_channel"),
 ):
     if len(cmd.args) == 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(detail="Must provide subcommand or guess")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            detail="Must provide subcommand or guess"
+        )
 
     game = await store.load_or(KEY)
     is_active = game.is_active()
@@ -216,7 +217,8 @@ async def hangguy(
                         prefix = "Bad guess!"
                     else:
                         game.game_state = "".join(
-                            guess if i in find else s for i, s in enumerate(game.game_state)
+                            guess if i in find else s
+                            for i, s in enumerate(game.game_state)
                         )
                         prefix = f"Great! '{guess}' appears {len(find)} time{'' if len(find) == 1 else 's'}!"
             elif game.puzzle.split() == guess.split():
@@ -231,13 +233,17 @@ async def hangguy(
             if game.is_finished():
                 game.end_game()
                 game.guy_lifetime += 1
-                txt += f"\nThe game is over, and the guy lives on!\n{game.survival_msg()}"
+                txt += (
+                    f"\nThe game is over, and the guy lives on!\n{game.survival_msg()}"
+                )
 
             if game.is_dead():
                 game.end_game()
                 dead = DeadGuy.from_guy(game)
                 game.reset_guy()
-                await dead_store.save(f"{uuid4().hex}-{datetime.now().isoformat()}", dead_guy)
+                await dead_store.save(
+                    f"{uuid4().hex}-{datetime.now().isoformat()}", dead_guy
+                )
                 txt += f"\noh god oh fuck, the guy is dead!\nYou failed to guess {game.puzzle}\nThe guy has been buried."
 
             await store.save(KEY, game)

+ 209 - 63
commands/commands/rollcoin.py

@@ -1,5 +1,4 @@
 from dataclasses import dataclass, field
-from typing import Optional
 from logging import Logger
 import random
 import json
@@ -8,7 +7,22 @@ import asyncio
 from sudoku_py import SudokuGenerator, Sudoku
 
 from rollbot import as_command, initialize_data, RollbotFailure, Attachment, Response
-from rollbot.injection import Data, Sender, Config, Const, Arg, Reply, OriginAdmin, Args, Attachments, Origin, Request, Lazy, Channel, SenderName
+from rollbot.injection import (
+    Data,
+    Sender,
+    Config,
+    Const,
+    Arg,
+    Reply,
+    OriginAdmin,
+    Args,
+    Attachments,
+    Origin,
+    Request,
+    Lazy,
+    Channel,
+    SenderName,
+)
 
 # View
 #   !wallet - shows your number of rollcoins, NFTs (Non-Functional Tamagotchis), and market balance
@@ -36,7 +50,7 @@ from rollbot.injection import Data, Sender, Config, Const, Arg, Reply, OriginAdm
 @dataclass
 class RollcoinState:
     treasury: float
-    mining_puzzle: Optional[list[list[int]]]
+    mining_puzzle: list[list[int | None]]
     market_state: int
     appraised: list[str] = field(default_factory=list)
 
@@ -44,7 +58,7 @@ class RollcoinState:
 @initialize_data
 @dataclass
 class RollcoinWallet:
-    balance: float = 10 # new wallets start with 10 coins
+    balance: float = 10  # new wallets start with 10 coins
     holdings: float = 0
     cost_basis: float = 0
     nfts: list[str] = field(default_factory=list)
@@ -69,7 +83,11 @@ class RollcoinWallet:
             for rarity in [NFT_MAX_RARITY, *NFT_RARITY[1:]]
         }
         counts[NFT_RARITY[0]] = num_nfts - sum(counts.values())
-        return "\n\t".join(f"{rarity.title()}: {c} ({100 * c / num_nfts:.02f}%)" for rarity in [*NFT_RARITY, NFT_MAX_RARITY] if (c := counts[rarity]) > 0)
+        return "\n\t".join(
+            f"{rarity.title()}: {c} ({100 * c / num_nfts:.02f}%)"
+            for rarity in [*NFT_RARITY, NFT_MAX_RARITY]
+            if (c := counts[rarity]) > 0
+        )
 
 
 SPECIAL_AMOUNTS = {
@@ -88,7 +106,9 @@ def convert_amount(amount):
 # constants
 GLOBAL_STATE_KEY = "ROLLCOIN_GLOBAL_STATE"
 NAME_URL = "https://randommer.io/api/Name?nameType=firstname&quantity=1"
-ICON_URL = "https://app.pixelencounter.com/api/basic/svgmonsters/image/png?size=256&fillType=2"
+ICON_URL = (
+    "https://app.pixelencounter.com/api/basic/svgmonsters/image/png?size=256&fillType=2"
+)
 NFT_COLORS = ["red", "blue", "yellow", "green", "orange", "purple", "brown", "gray"]
 NFT_UNCOMMON = "silver"
 NFT_RARE = "gold"
@@ -102,7 +122,9 @@ NFT_RARITY = ["common", "uncommon", "rare!", "ultra rare!"]
 NFT_MAX_RARITY = "legendary!"
 
 # injection values
-State = Data(RollcoinState).For(Const(GLOBAL_STATE_KEY), treasury=0, mining_puzzle=None, market_state=0)
+State = Data(RollcoinState).For(
+    Const(GLOBAL_STATE_KEY), treasury=0, mining_puzzle=None, market_state=0
+)
 SenderWallet = Data(RollcoinWallet).For(Sender)
 WalletLookup = Config("rollcoin.wallet_names")
 MarketTransitions = Config("rollcoin.market.transitions")
@@ -120,7 +142,10 @@ def wallet(sender_wallet: SenderWallet, reply: Reply):
     """
     View the contents of your wallet. Using this command once will initialize your wallet.
     """
-    return f"You currently own...\n{sender_wallet}\n\t{sender_wallet.get_nft_ratios()}".strip(), reply
+    return (
+        f"You currently own...\n{sender_wallet}\n\t{sender_wallet.get_nft_ratios()}".strip(),
+        reply,
+    )
 
 
 @as_command
@@ -128,19 +153,26 @@ def nfts(sender_wallet: SenderWallet, reply: Reply):
     """
     View the full list of your NFTs.
     """
-    nfts = '\n\t'.join(sender_wallet.nfts)
+    nfts = "\n\t".join(sender_wallet.nfts)
     return f"You currently own...\n\t{nfts}".strip(), reply
 
 
 @as_command
-async def blockchain(origin: Origin, wallets: Data(RollcoinWallet), wallet_lookup: WalletLookup, state: State):
+async def blockchain(
+    origin: Origin,
+    wallets: Data(RollcoinWallet),
+    wallet_lookup: WalletLookup,
+    state: State,
+):
     """
     View the contents of everyone's wallets
     """
     response = f"Blockchain:\n\tTreasury: {state.treasury}\n\n"
     names = {v: k.title() for k, v in wallet_lookup[origin].items()}
     wallets = [(sender_id, wallet) async for (sender_id, wallet) in wallets.all()]
-    for (sender_id, wallet) in sorted(wallets, key=lambda p: p[1].get_valuation(), reverse=True):
+    for sender_id, wallet in sorted(
+        wallets, key=lambda p: p[1].get_valuation(), reverse=True
+    ):
         if (name := names.get(sender_id, None)) is not None:
             response += f"{name}:\n"
             response += "\n".join("\t" + s for s in str(wallet).split("\n")) + "\n\n"
@@ -149,13 +181,18 @@ async def blockchain(origin: Origin, wallets: Data(RollcoinWallet), wallet_looku
 
 @as_command
 async def tip(
-    origin: Origin, 
+    origin: Origin,
     wallets: Data(RollcoinWallet),
     sender_id: Sender,
     sender_wallet: SenderWallet,
     wallet_lookup: WalletLookup,
     target_name: Arg(0, missing_msg="You must tell me who to tip!"),
-    amount: Arg(1, convert=convert_amount, missing_msg="You must provide an amount to tip!", fail_msg="Could not parse {} as value"),
+    amount: Arg(
+        1,
+        convert=convert_amount,
+        missing_msg="You must provide an amount to tip!",
+        fail_msg="Could not parse {} as value",
+    ),
 ):
     """
     Send RollCoins to another person, as in !tip [person] [amount]
@@ -171,10 +208,14 @@ async def tip(
         return f"Sorry! You only have {sender_wallet.balance} RollCoins available - try mining for more!"
 
     if amount <= 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Amount must be positive, not {amount}"
+        )
 
     if (target_id := wallet_lookup[origin].get(target_name.lower(), None)) is None:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Cannot find wallet belonging to {target_name}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Cannot find wallet belonging to {target_name}"
+        )
 
     target_wallet = await wallets.load_or(target_id)
     sender_wallet.balance -= amount
@@ -191,7 +232,12 @@ async def donate(
     sender_id: Sender,
     sender_wallet: SenderWallet,
     wallet_store: Data(RollcoinWallet),
-    amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to donate!", fail_msg="Could not parse {} as value"),
+    amount: Arg(
+        0,
+        convert=convert_amount,
+        missing_msg="You must provide an amount to donate!",
+        fail_msg="Could not parse {} as value",
+    ),
 ):
     """
     Donate RollCoins to everyone who has a wallet, the number of coins you specify will be divided amongst everyone else.
@@ -207,14 +253,16 @@ async def donate(
         return f"Sorry! You only have {sender_wallet.balance} RollCoins available - try mining for more!"
 
     if amount <= 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Amount must be positive, not {amount}"
+        )
 
     sender_wallet.balance -= amount
     await wallet_store.save(sender_id, sender_wallet)
 
     wallets = [(wid, w) async for wid, w in wallet_store.all() if wid != sender_id]
     to_donate = amount / len(wallets)
-    for (wallet_id, wallet) in wallets:
+    for wallet_id, wallet in wallets:
         wallet.balance += to_donate
         await wallet_store.save(wallet_id, wallet)
 
@@ -222,7 +270,9 @@ async def donate(
 
 
 async def evolve_market(state, transitions, multipliers, wallet_store, state_store):
-    state.market_state = random.choices(range(len(transitions)), weights=transitions[state.market_state], k=1)[0]
+    state.market_state = random.choices(
+        range(len(transitions)), weights=transitions[state.market_state], k=1
+    )[0]
     await state_store.save(GLOBAL_STATE_KEY, state)
     multiplier = multipliers[state.market_state]
     async for (wallet_id, wallet) in wallet_store.all():
@@ -240,7 +290,12 @@ async def bet(
     transitions: MarketTransitions,
     multipliers: MarketMultipliers,
     messages: MarketMessages,
-    amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to bet!", fail_msg="Could not parse {} as value"),
+    amount: Arg(
+        0,
+        convert=convert_amount,
+        missing_msg="You must provide an amount to bet!",
+        fail_msg="Could not parse {} as value",
+    ),
     reply: Reply,
 ):
     """
@@ -259,7 +314,9 @@ async def bet(
         return
 
     if amount <= 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Amount must be positive, not {amount}"
+        )
 
     sender_wallet.balance -= amount
     sender_wallet.holdings += amount
@@ -284,11 +341,16 @@ async def cash(
     transitions: MarketTransitions,
     multipliers: MarketMultipliers,
     messages: MarketMessages,
-    amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to bet!", fail_msg="Could not parse {} as value"),
+    amount: Arg(
+        0,
+        convert=convert_amount,
+        missing_msg="You must provide an amount to bet!",
+        fail_msg="Could not parse {} as value",
+    ),
     reply: Reply,
 ):
     """
-    Cash some number of RollCoins out of the market. 
+    Cash some number of RollCoins out of the market.
     Because RollCoins are the one true fiat currency, you can only cash out up to the number of coins available in the treasury.
     """
     if not isinstance(amount, float):
@@ -304,7 +366,9 @@ async def cash(
         return
 
     if amount <= 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Amount must be positive, not {amount}"
+        )
 
     if state.treasury == 0:
         yield f"Sorry! The treasury is actually empty right now so uh... no one can sell..."
@@ -343,6 +407,7 @@ async def mine(
     """
     Trade solved Sudokus for RollCoins. Admins can skip the current puzzle with !mine skip
     """
+
     async def generate_puzzle():
         exchange = list(zip("abcdefghi", range(10)))
         random.shuffle(exchange)
@@ -361,7 +426,9 @@ async def mine(
 
     if "skip" in args.strip(" !/").lower():
         if not admin:
-            RollbotFailure.PERMISSIONS.raise_exc("Only admins can skip the current mining challenge")
+            RollbotFailure.PERMISSIONS.raise_exc(
+                "Only admins can skip the current mining challenge"
+            )
         await generate_puzzle()
         yield f"Admin has skipped the previous mining challenge. The new challenge is:\n{Sudoku(board=state.mining_puzzle)}"
         return
@@ -403,7 +470,9 @@ async def mine(
 
     for i in range(0, 9, 3):
         for j in range(0, 9, 3):
-            if set(x for r in parsed[i:i+3] for x in r[j:j+3]) != set(range(1, 10)):
+            if set(x for r in parsed[i : i + 3] for x in r[j : j + 3]) != set(
+                range(1, 10)
+            ):
                 yield "Sorry, that solution isn't valid!", reply
                 yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
                 return
@@ -415,7 +484,7 @@ async def mine(
     sender_wallet = await get_sender_wallet()
     sender_wallet.balance += value
     await wallet_store.save(sender_id, sender_wallet)
-    
+
     # allow generate_puzzle to save the state
     state.treasury += value / 10
     await generate_puzzle()
@@ -443,9 +512,10 @@ async def appraise(
     """
     Use this command while replying to a popular post, and I will purchase it for some RollCoins!
     """
+
     def make_response(text):
         return Response(
-            origin_id=origin, 
+            origin_id=origin,
             channel_id=appraise_channel or received_channel,
             text=text,
         )
@@ -456,11 +526,21 @@ async def appraise(
             target_msg = json.loads(await reply_attachment.body())
         except:
             logger.exception("Failed appraisal, logging for debugging")
-            RollbotFailure.INVALID_ARGUMENTS.raise_exc("Reply to a message to have it appraised, I could not read that one")
+            RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+                "Reply to a message to have it appraised, I could not read that one"
+            )
         appraisal_id = f"GROUPME-{target_msg['group_id']}-{target_msg['id']}"
         if require_ownership and target_msg["sender_id"] != sender_id:
-            RollbotFailure.INVALID_ARGUMENTS.raise_exc("You can only appraise your own posts!")
-        score_base = len([t for t in target_msg["favorited_by"] if allow_self_likes or t != target_msg["sender_id"]])
+            RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+                "You can only appraise your own posts!"
+            )
+        score_base = len(
+            [
+                t
+                for t in target_msg["favorited_by"]
+                if allow_self_likes or t != target_msg["sender_id"]
+            ]
+        )
     elif origin == "DISCORD":
         try:
             target_msg = next(a for a in attachments if a.name == "reply").body
@@ -471,19 +551,29 @@ async def appraise(
                         score_base += 1
         except:
             logger.exception("Failed appraisal, logging for debugging")
-            RollbotFailure.INVALID_ARGUMENTS.raise_exc("Reply to a message to have it appraised, I could not read that one")
+            RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+                "Reply to a message to have it appraised, I could not read that one"
+            )
         appraisal_id = f"DISCORD-{target_msg.channel.id}-{target_msg.id}"
         if require_ownership and str(target_msg.author.id) != sender_id:
-            RollbotFailure.INVALID_ARGUMENTS.raise_exc("You can only appraise your own posts!")
+            RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+                "You can only appraise your own posts!"
+            )
     # other origins can be handled here
     else:
-        RollbotFailure.INVALID_COMMAND.raise_exc(f"Message appraisal is not implemented in this platform (origin was {origin})")
+        RollbotFailure.INVALID_COMMAND.raise_exc(
+            f"Message appraisal is not implemented in this platform (origin was {origin})"
+        )
 
     if appraisal_id in state.appraised:
-        return make_response(f"Sorry {sender_name}, I've already purchased that message!")
+        return make_response(
+            f"Sorry {sender_name}, I've already purchased that message!"
+        )
 
     if score_base <= 2:
-        return make_response(f"Sorry {sender_name}, I don't think that message is worth very much...")
+        return make_response(
+            f"Sorry {sender_name}, I don't think that message is worth very much..."
+        )
 
     value = round(abs(random.gauss(score_base * 50, 15)), 2)
     sender_wallet = await get_sender_wallet()
@@ -492,12 +582,16 @@ async def appraise(
     state.appraised.append(appraisal_id)
     await state_store.save(GLOBAL_STATE_KEY, state)
 
-    return make_response(f"That post is very interesting, {sender_name}!\nI will purchase it for {value} RollCoins!\nThat brings your balance to {sender_wallet.balance}")
+    return make_response(
+        f"That post is very interesting, {sender_name}!\nI will purchase it for {value} RollCoins!\nThat brings your balance to {sender_wallet.balance}"
+    )
 
 
 def validate_pulls(pulls, balance):
     if not (1 <= pulls <= 10):
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc("Number of pulls must be between 1 and 10")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            "Number of pulls must be between 1 and 10"
+        )
 
     if balance < pulls:
         return f"You only have {balance} RollCoins available, and gacha pulls cost one each!"
@@ -505,13 +599,13 @@ def validate_pulls(pulls, balance):
 
 def pull_gacha_color():
     pull = random.randint(1, 100)
-    if pull == 100: # 1%
+    if pull == 100:  # 1%
         return NFT_ULTRA
-    if pull >= 90: # 10%
+    if pull >= 90:  # 10%
         return NFT_RARE
-    if pull >= 70: # 20%
+    if pull >= 70:  # 20%
         return NFT_UNCOMMON
-    return random.choice(NFT_COLORS) # nice%
+    return random.choice(NFT_COLORS)  # nice%
 
 
 async def pull_gacha(req, name_api_key, logger):
@@ -521,11 +615,15 @@ async def pull_gacha(req, name_api_key, logger):
     try:
         async with req.get(NAME_URL, headers={"X-Api-Key": name_api_key}) as res:
             name = (await res.json())[0]
-        async with req.get(ICON_URL + f"&primaryColor={color1}&secondaryColor={color2}") as res:
+        async with req.get(
+            ICON_URL + f"&primaryColor={color1}&secondaryColor={color2}"
+        ) as res:
             img = Attachment(name="image", body=await res.read())
     except:
         logger.exception("Failed gacha, logging for debugging")
-        RollbotFailure.SERVICE_DOWN.raise_exc("Failed to pull from gachapon, your coin has not been deducted")
+        RollbotFailure.SERVICE_DOWN.raise_exc(
+            "Failed to pull from gachapon, your coin has not been deducted"
+        )
 
     # calculate rarity
     rarity = (NFT_RARITY_LOOKUP.get(color1, 0) + NFT_RARITY_LOOKUP.get(color2, 0)) // 2
@@ -555,11 +653,12 @@ async def run_gacha_pulls(receiver_name, req, name_api_key, sleep_time, logger,
             yield f"Failed to pull from gachapon! You will not be charged the coin for this pull.",
 
 
-
 @as_command
 async def gacha(
     sender_wallet: SenderWallet,
-    get_sender_wallet: Lazy(SenderWallet), # used for re-querying to mitigate race condition
+    get_sender_wallet: Lazy(
+        SenderWallet
+    ),  # used for re-querying to mitigate race condition
     sender_id: Sender,
     wallet_store: Data(RollcoinWallet),
     get_state: Lazy(State),
@@ -568,7 +667,13 @@ async def gacha(
     logger: Logger,
     req: Request,
     name_api_key: NameAPIKey,
-    pulls: Arg(0, convert=int, required=False, default=1, fail_msg="Number of pulls must be an integer, not {}"),
+    pulls: Arg(
+        0,
+        convert=int,
+        required=False,
+        default=1,
+        fail_msg="Number of pulls must be an integer, not {}",
+    ),
     reply: Reply,
 ):
     """
@@ -580,10 +685,12 @@ async def gacha(
         return
 
     pulled = []
-    async for result in run_gacha_pulls("You", req, name_api_key, sleep_time, logger, pulls):
+    async for result in run_gacha_pulls(
+        "You", req, name_api_key, sleep_time, logger, pulls
+    ):
         pulled.append(result[0])
         yield *result, reply
-    
+
     sender_wallet = await get_sender_wallet()
     sender_wallet.balance -= len(pulled)
     sender_wallet.nfts += pulled
@@ -596,9 +703,11 @@ async def gacha(
 
 @as_command
 async def giftcha(
-    origin: Origin, 
+    origin: Origin,
     sender_wallet: SenderWallet,
-    get_sender_wallet: Lazy(SenderWallet), # used for re-querying to mitigate race condition
+    get_sender_wallet: Lazy(
+        SenderWallet
+    ),  # used for re-querying to mitigate race condition
     sender_id: Sender,
     wallet_store: Data(RollcoinWallet),
     get_state: Lazy(State),
@@ -609,7 +718,13 @@ async def giftcha(
     name_api_key: NameAPIKey,
     wallet_lookup: WalletLookup,
     target_name: Arg(0, missing_msg="Must provide a target to tax"),
-    pulls: Arg(1, convert=int, required=False, default=1, fail_msg="Number of pulls must be an integer, not {}"),
+    pulls: Arg(
+        1,
+        convert=int,
+        required=False,
+        default=1,
+        fail_msg="Number of pulls must be an integer, not {}",
+    ),
     reply: Reply,
 ):
     """
@@ -620,13 +735,17 @@ async def giftcha(
         return
 
     if (target_id := wallet_lookup[origin].get(target_name.lower(), None)) is None:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Could not find a wallet for {target_name}")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Could not find a wallet for {target_name}"
+        )
 
     pulled = []
-    async for result in run_gacha_pulls(target_name.title(), req, name_api_key, sleep_time, logger, pulls):
+    async for result in run_gacha_pulls(
+        target_name.title(), req, name_api_key, sleep_time, logger, pulls
+    ):
         pulled.append(result[0])
         yield result
-    
+
     sender_wallet = await get_sender_wallet()
     sender_wallet.balance -= len(pulled)
     await wallet_store.save(sender_id, sender_wallet)
@@ -642,10 +761,16 @@ async def giftcha(
 
 # ADMIN COMMANDS
 
+
 @as_command
 async def deflate(
     origin_admin: OriginAdmin,
-    power: Arg(0, convert=int, missing_msg="Must provide power to deflate by", fail_msg="Power to deflate by must be an integer, not {}"),
+    power: Arg(
+        0,
+        convert=int,
+        missing_msg="Must provide power to deflate by",
+        fail_msg="Power to deflate by must be an integer, not {}",
+    ),
     wallet_store: Data(RollcoinWallet),
     state_store: Data(RollcoinState),
 ):
@@ -655,7 +780,7 @@ async def deflate(
     if not origin_admin:
         RollbotFailure.PERMISSIONS.raise_exc("Only admins can deflate the currency")
 
-    factor = 10 ** power
+    factor = 10**power
 
     state = await state_store.load(GLOBAL_STATE_KEY)
     state.treasury /= factor
@@ -673,7 +798,12 @@ async def deflate(
 @as_command
 async def brrr(
     origin_admin: OriginAdmin,
-    coins: Arg(0, convert=float, missing_msg="Must provide coins to mint", fail_msg="Coins to mint by must be a number, not {}"),
+    coins: Arg(
+        0,
+        convert=float,
+        missing_msg="Must provide coins to mint",
+        fail_msg="Coins to mint by must be a number, not {}",
+    ),
     state_store: Data(RollcoinState),
 ):
     """
@@ -683,7 +813,9 @@ async def brrr(
         RollbotFailure.PERMISSIONS.raise_exc("Only admins can mint coins")
 
     if coins <= 0:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc("Can only mint a positive number of coins")
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            "Can only mint a positive number of coins"
+        )
 
     state = await state_store.load(GLOBAL_STATE_KEY)
     state.treasury += coins
@@ -698,7 +830,12 @@ async def tax(
     origin_admin: OriginAdmin,
     wallet_lookup: WalletLookup,
     target_name: Arg(0, missing_msg="Must provide a target to tax"),
-    coins: Arg(1, convert=float, missing_msg="Must provide coins to tax", fail_msg="Coins to tax by must be a number, not {}"),
+    coins: Arg(
+        1,
+        convert=float,
+        missing_msg="Must provide coins to tax",
+        fail_msg="Coins to tax by must be a number, not {}",
+    ),
     wallet_store: Data(RollcoinWallet),
     state_store: Data(RollcoinState),
 ):
@@ -708,8 +845,12 @@ async def tax(
     if not origin_admin:
         RollbotFailure.PERMISSIONS.raise_exc("Only admins can tax someone")
 
-    if (target_id := wallet_lookup[origin].get(target_name.lower(), None)) is None or (wallet := await wallet_store.load(target_id)) is None:
-        RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Could not find a wallet for {target_name}")
+    if (target_id := wallet_lookup[origin].get(target_name.lower(), None)) is None or (
+        wallet := await wallet_store.load(target_id)
+    ) is None:
+        RollbotFailure.INVALID_ARGUMENTS.raise_exc(
+            f"Could not find a wallet for {target_name}"
+        )
 
     state = await state_store.load(GLOBAL_STATE_KEY)
     state.treasury += coins
@@ -723,7 +864,12 @@ async def tax(
 @as_command
 async def market(
     origin_admin: OriginAdmin,
-    target_state: Arg(0, convert=int, missing_msg="Must provide target state for market", fail_msg="Target market state must be an integer, not {}"),
+    target_state: Arg(
+        0,
+        convert=int,
+        missing_msg="Must provide target state for market",
+        fail_msg="Target market state must be an integer, not {}",
+    ),
     state_store: Data(RollcoinState),
     messages: MarketMessages,
 ):

+ 7 - 4
commands/commands/session.py

@@ -1,4 +1,3 @@
-from typing import Optional
 import datetime
 
 from rollbot import as_command, initialize_data, RollbotFailure
@@ -7,7 +6,7 @@ from rollbot.injection import Subcommand, Data, ChannelAdmin, OriginAdmin, Chann
 
 @initialize_data
 class DnDSession:
-    session_time: Optional[float] = None
+    session_time: float | None = None
 
     @staticmethod
     def fmt(sesh):
@@ -73,7 +72,9 @@ def parse_datetime(arg_text):
     minute = 45
 
     if arg_text is None:
-        return datetime.datetime(int(year), int(month), int(day), int(hour), int(minute))
+        return datetime.datetime(
+            int(year), int(month), int(day), int(hour), int(minute)
+        )
 
     time, arg_text = pop_arg(arg_text)
     time_parts = time.split(":")
@@ -84,7 +85,9 @@ def parse_datetime(arg_text):
     ampm, arg_text = pop_arg(arg_text)
 
     if ampm is None:
-        return datetime.datetime(int(year), int(month), int(day), int(hour), int(minute))
+        return datetime.datetime(
+            int(year), int(month), int(day), int(hour), int(minute)
+        )
 
     if ampm.lower() == "pm":
         hour = int(hour) + 12

+ 14 - 8
rollbot/rollbot/decorators/as_command.py

@@ -1,5 +1,5 @@
 from collections.abc import Callable, AsyncGenerator
-from typing import Union, Any
+from typing import Any
 from functools import wraps
 from logging import Logger
 import inspect
@@ -25,7 +25,9 @@ from .error_handling import with_failure_handling
 decorated_commands: dict[str, CommandType] = {}
 
 
-def _lift_command_fn(fn: Callable[..., Any]) -> Callable[..., AsyncGenerator[Any, None]]:
+def _lift_command_fn(
+    fn: Callable[..., Any]
+) -> Callable[..., AsyncGenerator[Any, None]]:
     if inspect.isasyncgenfunction(fn):
         lifted = fn
     elif inspect.iscoroutinefunction(fn):
@@ -77,17 +79,21 @@ def _get_injectors(fn: Callable[..., Any]) -> list[Injector]:
 
 
 def _is_valid_tuple_response(result: Any):
-    return (isinstance(result, tuple) and 
-            len(result) > 0 and 
-            isinstance(result[0], str) and 
-            all(isinstance(a, Attachment) for a in result[1:]))
+    return (
+        isinstance(result, tuple)
+        and len(result) > 0
+        and isinstance(result[0], str)
+        and all(isinstance(a, Attachment) for a in result[1:])
+    )
 
 
 def _make_response(message: Message, result: Any) -> Response:
     if result is None or isinstance(result, Response):
         return result
     if _is_valid_tuple_response(result):
-        return Response.from_message(message, text=result[0], attachments=list(result[1:]))
+        return Response.from_message(
+            message, text=result[0], attachments=list(result[1:])
+        )
     if isinstance(result, str):
         return Response.from_message(message, text=result)
     if isinstance(result, Attachment):
@@ -111,7 +117,7 @@ def _on_command_impl(name: str, fn: Callable[..., Any]) -> Callable[..., Any]:
     return fn
 
 
-def as_command(arg: Union[str, Callable]) -> Callable:
+def as_command(arg: str | Callable) -> Callable:
     if isinstance(arg, str):
         return lambda fn: _on_command_impl(arg, fn)
     else:

+ 2 - 3
rollbot/rollbot/failure.py

@@ -1,11 +1,10 @@
 from __future__ import annotations
 
-from typing import Optional
 from enum import Enum, auto
 
 
 class RollbotFailureException(BaseException):
-    def __init__(self, failure: RollbotFailure, detail: Optional[str]):
+    def __init__(self, failure: RollbotFailure, detail: str | None):
         super().__init__()
         self.failure = failure
         self.detail = detail
@@ -20,5 +19,5 @@ class RollbotFailure(Enum):
     PERMISSIONS = auto()
     INTERNAL_ERROR = auto()
 
-    def raise_exc(self, detail: Optional[str] = None):
+    def raise_exc(self, detail: str | None = None):
         raise RollbotFailureException(self, detail)

+ 5 - 5
rollbot/rollbot/injection/args.py

@@ -1,5 +1,5 @@
 from argparse import ArgumentParser, Namespace
-from typing import Optional, TypeVar
+from typing import TypeVar
 from collections.abc import Callable
 import shlex
 
@@ -25,7 +25,7 @@ class ArgAccessorBase:
 
 
 class ArgListSplitOn(Injector[list[str]], ArgAccessorBase):
-    def __init__(self, split: Optional[str] = None):
+    def __init__(self, split: str | None = None):
         self.split = split
 
     async def inject(self, message: Message, context: Context) -> str:
@@ -60,9 +60,9 @@ class Arg(Injector[ArgType]):
         index: int = 0,
         convert: Callable[[str], ArgType] = str,
         required: bool = True,
-        default: Optional[ArgType] = None,
-        missing_msg: Optional[str] = None,
-        fail_msg: Optional[str] = None,
+        default: ArgType | None = None,
+        missing_msg: str | None = None,
+        fail_msg: str | None = None,
     ):
         self.index = index
         self.convert = convert

+ 15 - 7
rollbot/rollbot/injection/data.py

@@ -1,4 +1,4 @@
-from typing import Generic, TypeVar, Type, Optional, Any
+from typing import Generic, TypeVar, Type, Any
 from collections.abc import AsyncGenerator
 import dataclasses
 import json
@@ -34,7 +34,9 @@ class DataStore(Generic[DataType]):
         self.datatype = datatype
         self.connection = connection
         self.table_name = "".join(
-            ("_" + c.lower()) if "A" <= c <= "Z" else c for c in datatype.__name__ if c.isalnum()
+            ("_" + c.lower()) if "A" <= c <= "Z" else c
+            for c in datatype.__name__
+            if c.isalnum()
         ).strip("_")
 
     async def _setup(self):
@@ -46,7 +48,7 @@ class DataStore(Generic[DataType]):
         )
         await self.connection.commit()
 
-    async def load(self, key: str) -> Optional[DataType]:
+    async def load(self, key: str) -> DataType | None:
         async with self.connection.execute(
             f"SELECT body FROM {self.table_name} WHERE key = ?", (key,)
         ) as cursor:
@@ -67,9 +69,13 @@ class DataStore(Generic[DataType]):
         query = f"SELECT key, body FROM {self.table_name}"
         filter_params = []
         if len(kw) > 0:
-            query += " WHERE " + (" AND ".join("json_extract(body, ?) = ?" for _ in range(len(kw))))
-            for (key, value) in kw.items():
-                filter_params.append(f"$.{''.join(k for k in key if k.isalnum() or k == '_')}")
+            query += " WHERE " + (
+                " AND ".join("json_extract(body, ?) = ?" for _ in range(len(kw)))
+            )
+            for key, value in kw.items():
+                filter_params.append(
+                    f"$.{''.join(k for k in key if k.isalnum() or k == '_')}"
+                )
                 filter_params.append(value)
         async with self.connection.execute(query, filter_params) as cursor:
             async for (key, body) in cursor:
@@ -86,7 +92,9 @@ class DataStore(Generic[DataType]):
 
 
 class DataFor(Injector[DataType]):
-    def __init__(self, datatype: Type[DataType], key: Injector[str], kwargs: dict[str, Any]):
+    def __init__(
+        self, datatype: Type[DataType], key: Injector[str], kwargs: dict[str, Any]
+    ):
         self.datatype = datatype
         self.key = key
         self.kwargs = kwargs

+ 4 - 5
rollbot/rollbot/injection/subcommand.py

@@ -1,7 +1,6 @@
-from collections.abc import Callable
-from typing import TypeVar, Optional
+from typing import TypeVar
 
-from ..types import Message, Context, Command
+from ..types import Message, Command
 from ..failure import RollbotFailure
 from .base import Injector, Simple
 from .args import ArgListSplitOn, ArgParse, Arg
@@ -37,8 +36,8 @@ class SubcArg(Arg):
         return SubcArgList
 
 
-class _Subcommand(Simple[Optional[Command]]):
-    Args = Simple[Optional[str]](lambda m, c: get_subc_args(m))
+class _Subcommand(Simple[Command | None]):
+    Args = Simple[str | None](lambda m, c: get_subc_args(m))
     ArgList = SubcArgList
     ArgListSplitOn = SubcArgListSplitOn
     ArgParse = SubcArgParse

+ 9 - 4
rollbot/rollbot/injection/util.py

@@ -1,5 +1,5 @@
 from collections.abc import Callable, Coroutine
-from typing import TypeVar, Any, Optional
+from typing import TypeVar, Any
 from datetime import datetime
 from logging import Logger
 
@@ -39,7 +39,7 @@ Sender = Simple[str](lambda m, c: m.sender_id)
 Timestamp = Simple[datetime](lambda m, c: m.timestamp)
 OriginAdmin = Simple[bool](lambda m, c: m.origin_admin)
 ChannelAdmin = Simple[bool](lambda m, c: m.channel_admin)
-SenderName = Simple[Optional[str]](lambda m, c: m.sender_name)
+SenderName = Simple[str | None](lambda m, c: m.sender_name)
 Text = Simple[str](lambda m, c: m.text)
 Attachments = Simple[list[Attachment]](lambda m, c: m.attachments)
 Reply = Simple[Attachment](lambda m, c: Attachment(name="reply", body=m.message_id))
@@ -47,7 +47,8 @@ CommandInjector = Simple[Command](lambda m, c: m.command)
 Request = Simple[ClientSession](lambda m, c: c.request)
 Respond = Simple[Callable[[], Coroutine[None, None, None]]](lambda m, c: c.respond)
 LoggerInjector = Simple[Logger](lambda m, c: c.logger)
-Debugging = Simple[Optional[str]](lambda m, c: c.get_debugging())
+Debugging = Simple[str | None](lambda m, c: c.get_debugging())
+
 
 class Config(Injector[Any]):
     def __init__(self, key: str):
@@ -84,9 +85,13 @@ class Lazy(InjectorWithCleanup[Callable[[], Coroutine[None, None, Dep]]]):
         return _Wrapper(self.deferred)
 
     async def cleanup(self, dep: Callable[[], Coroutine[None, None, Dep]]):
-        if isinstance(self.deferred, InjectorWithCleanup) and dep._calculated is not None:
+        if (
+            isinstance(self.deferred, InjectorWithCleanup)
+            and dep._calculated is not None
+        ):
             await self.deferred.cleanup(dep._calculated)
 
+
 class Const(Injector[Dep]):
     def __init__(self, const: Dep):
         self.const = const

+ 13 - 13
rollbot/rollbot/types.py

@@ -4,7 +4,7 @@ from logging import Logger
 from dataclasses import dataclass, field
 from datetime import datetime
 from collections.abc import Callable, Coroutine, Container
-from typing import Union, Any, Optional
+from typing import Any
 import time
 
 from aiosqlite import Connection
@@ -25,7 +25,7 @@ __all__ = [
 @dataclass
 class Attachment:
     name: str
-    body: Union[str, bytes, None, Callable[[], Coroutine[Any, None, Any]]]
+    body: str | bytes | None | Callable[[], Coroutine[Any, None, Any]]
 
 
 @dataclass
@@ -36,12 +36,12 @@ class Message:
     timestamp: datetime
     origin_admin: bool
     channel_admin: bool
-    sender_name: Optional[str] = None
-    text: Optional[str] = None
+    sender_name: str | None = None
+    text: str | None = None
     attachments: list[Attachment] = field(default_factory=list)
-    message_id: Optional[str] = None
+    message_id: str | None = None
     received_at: float = field(default_factory=time.time)
-    command: Optional[Command] = None
+    command: Command | None = None
 
 
 @dataclass
@@ -56,7 +56,7 @@ class Command:
         self.cache = {}
 
     @staticmethod
-    def from_text(text: str) -> Optional[Command]:
+    def from_text(text: str) -> Command | None:
         cleaned = text.lstrip()
 
         if len(cleaned) < 2:
@@ -88,13 +88,13 @@ class Command:
 class Response:
     origin_id: str
     channel_id: str
-    text: Optional[str] = None
-    attachments: Optional[list[Attachment]] = None
-    cause: Optional[Message] = None
+    text: str | None = None
+    attachments: list[Attachment | None] = None
+    cause: Message | None = None
 
     @staticmethod
     def from_message(
-        msg: Message, text: Optional[str] = None, attachments: list[Attachment] = None
+        msg: Message, text: str | None = None, attachments: list[Attachment] = None
     ) -> Response:
         return Response(
             origin_id=msg.origin_id,
@@ -112,9 +112,9 @@ class Context:
     request: ClientSession
     database: Callable[[], Coroutine[None, None, Connection]]
     logger: Logger
-    debugging: Optional[str] = None
+    debugging: str | None = None
 
-    def get_debugging(self) -> Optional[str]:
+    def get_debugging(self) -> str | None:
         old = self.debugging
         self.debugging = None
         return old