123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- from dataclasses import dataclass, field
- from typing import Optional
- from logging import Logger
- import random
- import json
- from sudoku_py import SudokuGenerator, Sudoku
- from rollbot import as_command, initialize_data, RollbotFailure, Attachment
- from rollbot.injection import Data, Sender, Config, Const, Arg, Reply, OriginAdmin, Args, Attachments, Origin, Request
- # View
- # !wallet - shows your number of rollcoins, NFTs (Non-Functional Tamagotchis), and market balance
- # !blockchain - shows the contents of all wallets and the rollcoins in the treasury
- # Generate
- # !mine - provides a sudoku that can be solved for rollcoins, which also adds to the treasury
- # !appraise - rollbot will purchase a post, based indirectly on number of likes received
- # Spend
- # !tip - transfer rollcoins from one person to another
- # !gacha - insert a rollcoin (which enters the treasury), receive a random NFT
- # !donate - split an amount and donate it to everyone
- # Market
- # !bet - put some amount of rollcoins into the market, some portion of which will be put in the treasury, evolves market
- # !cash - take some amount of rollcoins from the market, up to investment + the number of coins in the treasury, evolves market
- # Admin
- # !deflate - deflate all coins by a power of 10 (negative to inflate economy)
- # !brrr - print some number of coins into the treasury (negative to delete coins)
- # !tax - take some number of coins from a wallet and put it in the treasury (uses wallet id, negative to give coins)
- # !mine skip - skip the current mining puzzle
- # !market - force the market to transition to a given state
- @initialize_data
- @dataclass
- class RollcoinState:
- treasury: float
- mining_puzzle: Optional[list[list[int]]]
- market_state: int
- appraised: list[str] = field(default_factory=list)
- @initialize_data
- @dataclass
- class RollcoinWallet:
- balance: float = 0
- holdings: float = 0
- cost_basis: float = 0
- nfts: list[str] = field(default_factory=list)
- def __str__(self):
- s = f"Wallet: {self.balance}\n"
- if self.holdings > 0 or self.cost_basis > 0:
- s += f"Investments: {self.holdings}\n\t(cost basis {self.cost_basis})\n"
- if (num_nfts := len(self.nfts)) > 0:
- s += f"NFTs: {num_nfts}\n"
- # writing out all of them produces a TON of spam
- # for nft in self.nfts:
- # s += f"\t{nft}\n"
- return s.strip()
- SPECIAL_AMOUNTS = {
- "all": lambda limit: limit,
- "half": lambda limit: limit / 2,
- "frac": lambda limit: limit - int(limit),
- }
- def convert_amount(amount):
- if (conv := SPECIAL_AMOUNTS.get(amount.lower(), None)) is not None:
- return conv
- return float(amount)
- # constants
- GLOBAL_STATE_KEY = "ROLLCOIN_GLOBAL_STATE"
- NAME_URL = "https://randommer.io/api/Name?nameType=firstname&quantity=1"
- ICON_URL = "https://app.pixelencounter.com/api/basic/svgmonsters/image/png?size=256&fillType=2"
- NFT_COLORS = ["red", "blue", "yellow", "green", "orange", "purple", "brown", "gray"]
- NFT_UNCOMMON = "silver"
- NFT_RARE = "gold"
- NFT_ULTRA = "black"
- NFT_RARITY_LOOKUP = {
- NFT_UNCOMMON: 1,
- NFT_RARE: 2,
- NFT_ULTRA: 3,
- }
- NFT_RARITY = ["common", "uncommon", "rare!", "ultra rare!"]
- # injection values
- State = Data(RollcoinState).For(Const(GLOBAL_STATE_KEY), treasury=0, mining_puzzle=None, market_state=0)
- SenderWallet = Data(RollcoinWallet).For(Sender, balance=10) # new wallets start with 10 coins
- WalletLookup = Config("rollcoin.wallet_names")
- MarketTransitions = Config("rollcoin.market.transitions")
- MarketMultipliers = Config("rollcoin.market.multipliers")
- MarketMessages = Config("rollcoin.market.messages")
- NameAPIKey = Config("rollcoin.gacha.name_api_key")
- @as_command
- def wallet(sender_wallet: SenderWallet, reply: Reply):
- """
- View the contents of your wallet. Using this command once will initialize your wallet.
- """
- return f"You currently own...\n{sender_wallet}", reply
- @as_command
- async def blockchain(wallets: Data(RollcoinWallet), wallet_lookup: WalletLookup, state: State):
- """
- View the contents of everyone's wallets
- """
- response = f"Blockchain:\n\tTreasury: {state.treasury}\n\n"
- names = {v: k.title() for k, v in wallet_lookup.items()}
- async for (sender_id, wallet) in wallets.all():
- response += names.get(sender_id, f"Unnamed wallet {sender_id}") + ":\n"
- response += "\n".join("\t" + s for s in str(wallet).split("\n")) + "\n\n"
- return response.strip()
- @as_command
- async def tip(
- wallets: Data(RollcoinWallet),
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_lookup: WalletLookup,
- target_name: Arg(0, missing_msg="You must tell me who to tip!"),
- amount: Arg(1, convert=convert_amount, missing_msg="You must provide an amount to tip!", fail_msg="Could not parse {} as value"),
- ):
- """
- Send RollCoins to another person, as in !tip [person] [amount]
- """
- if not isinstance(amount, float):
- # handle special converters
- amount = amount(sender_wallet.balance)
- if sender_wallet.balance == 0:
- return "Sorry! You don't have any rollcoins right now - try mining!"
- if amount > sender_wallet.balance:
- return f"Sorry! You only have {sender_wallet.balance} RollCoins available - try mining for more!"
- if amount <= 0:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
- if (target_id := wallet_lookup.get(target_name.lower(), None)) is None:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Cannot find wallet belonging to {target_name}")
- target_wallet = await wallets.load_or(target_id)
- sender_wallet.balance -= amount
- target_wallet.balance += amount
- await wallets.save(target_id, target_wallet)
- await wallets.save(sender_id, sender_wallet)
- return f"Done! {target_name} now has {target_wallet.balance} RollCoins!"
- @as_command
- async def donate(
- wallets: Data(RollcoinWallet),
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_store: Data(RollcoinWallet),
- amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to donate!", fail_msg="Could not parse {} as value"),
- ):
- """
- Donate RollCoins to everyone who has a wallet, the number of coins you specify will be divided amongst everyone else.
- """
- if not isinstance(amount, float):
- # handle special converters
- amount = amount(sender_wallet.balance)
- if sender_wallet.balance == 0:
- return "Sorry! You don't have any rollcoins right now - try mining!"
- if amount > sender_wallet.balance:
- return f"Sorry! You only have {sender_wallet.balance} RollCoins available - try mining for more!"
- if amount <= 0:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
- sender_wallet.balance -= amount
- await wallet_store.save(sender_id, sender_wallet)
- wallets = [(wid, w) async for wid, w in wallet_store.all() if wid != sender_id]
- to_donate = amount / len(wallets)
- for (wallet_id, wallet) in wallets:
- wallet.balance += to_donate
- await wallet_store.save(wallet_id, wallet)
- return f"Done! Donated {to_donate} to each other wallet, leaving you with {sender_wallet.balance} RollCoins!"
- async def evolve_market(state, transitions, multipliers, wallet_store, state_store):
- state.market_state = random.choices(range(len(transitions)), weights=transitions[state.market_state], k=1)[0]
- await state_store.save(GLOBAL_STATE_KEY, state)
- multiplier = multipliers[state.market_state]
- async for (wallet_id, wallet) in wallet_store.all():
- wallet.holdings *= multiplier
- await wallet_store.save(wallet_id, wallet)
- @as_command
- async def bet(
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_store: Data(RollcoinWallet),
- state: State,
- state_store: Data(RollcoinState),
- transitions: MarketTransitions,
- multipliers: MarketMultipliers,
- messages: MarketMessages,
- amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to bet!", fail_msg="Could not parse {} as value"),
- reply: Reply,
- ):
- """
- Bet some number of RollCoins on the market. These coins will leave your wallet.
- """
- if not isinstance(amount, float):
- # handle special converters
- amount = amount(sender_wallet.balance)
- if sender_wallet.balance == 0:
- yield "Sorry! You don't have any rollcoins right now - try mining!"
- return
- if amount > sender_wallet.balance:
- yield f"Sorry! You only have {sender_wallet.balance} RollCoins available - try mining for more!"
- return
- if amount <= 0:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
- sender_wallet.balance -= amount
- sender_wallet.holdings += amount
- sender_wallet.cost_basis += amount
- await wallet_store.save(sender_id, sender_wallet)
- # coins enter treasury, allow evolve_market to save state
- state.treasury += amount
- await evolve_market(state, transitions, multipliers, wallet_store, state_store)
- yield f"Trade complete!\n{await wallet_store.load(sender_id)}", reply
- yield f"Market status: {messages[state.market_state]}"
- @as_command
- async def cash(
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_store: Data(RollcoinWallet),
- state: State,
- state_store: Data(RollcoinState),
- transitions: MarketTransitions,
- multipliers: MarketMultipliers,
- messages: MarketMessages,
- amount: Arg(0, convert=convert_amount, missing_msg="You must provide an amount to bet!", fail_msg="Could not parse {} as value"),
- reply: Reply,
- ):
- """
- Cash some number of RollCoins out of the market.
- Because RollCoins are the one true fiat currency, you can only cash out up to the number of coins available in the treasury.
- """
- if not isinstance(amount, float):
- # handle special converters
- amount = amount(sender_wallet.holdings)
- if sender_wallet.holdings == 0:
- yield "Sorry! You don't have any rollcoins invested right now - try betting!"
- return
- if amount > sender_wallet.holdings:
- yield f"Sorry! You only have {sender_wallet.holdings} RollCoins in the market - try betting more first!"
- return
- if amount <= 0:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Amount must be positive, not {amount}")
- if state.treasury == 0:
- yield f"Sorry! The treasury is actually empty right now so uh... no one can sell..."
- return
- if amount <= state.treasury:
- response = "Successfully sold!"
- actual_amount = amount
- else:
- response = f"Funny story, I'm actually out of coins! I gave you what I could, draining the treasury, which was {state.treasury}."
- actual_amount = state.treasury
- sender_wallet.balance += actual_amount
- sender_wallet.holdings -= actual_amount
- sender_wallet.cost_basis = max(sender_wallet.cost_basis - actual_amount, 0)
- await wallet_store.save(sender_id, sender_wallet)
- # coins exit treasury, allow evolve_market to save state
- state.treasury -= actual_amount
- await evolve_market(state, transitions, multipliers, wallet_store, state_store)
- yield f"{response}\n{await wallet_store.load(sender_id)}", reply
- yield f"Market status: {messages[state.market_state]}"
- @as_command
- async def mine(
- args: Args,
- admin: OriginAdmin,
- state: State,
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_store: Data(RollcoinWallet),
- state_store: Data(RollcoinState),
- reply: Reply,
- ):
- """
- Trade solved Sudokus for RollCoins. Admins can skip the current puzzle with !mine skip
- """
- async def generate_puzzle():
- exchange = list(zip("abcdefghi", range(10)))
- random.shuffle(exchange)
- gen = SudokuGenerator(9)
- gen.generate(0)
- gen.board_exchange_values({k: v + 1 for k, v in exchange})
- gen.generate(random.randint(10, 50))
- state.mining_puzzle = gen.board
- await state_store.save(GLOBAL_STATE_KEY, state)
- if args is None or len(args) == 0:
- if state.mining_puzzle is None:
- await generate_puzzle()
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- return
- if "skip" in args.strip(" !/").lower():
- if not admin:
- RollbotFailure.PERMISSIONS.raise_exc("Only admins can skip the current mining challenge")
- await generate_puzzle()
- yield f"Admin has skipped the previous mining challenge. The new challenge is:\n{Sudoku(board=state.mining_puzzle)}"
- return
- # best effort parse input into a sudoku grid
- parsed = []
- row = []
- for c in args:
- if not c.isdigit():
- continue
- row.append(int(c))
- if len(row) == 9:
- parsed.append(row)
- row = []
- try:
- Sudoku(board=parsed, block_width=3, block_height=3)
- except:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc("Could not parse that solution")
- for i, row in enumerate(state.mining_puzzle):
- for j, cell in enumerate(row):
- if cell not in (0, parsed[i][j]):
- yield "Sorry, that solution doesn't match the puzzle!", reply
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- return
- for row in parsed:
- if set(row) != set(range(1, 10)):
- yield "Sorry, that solution isn't valid!", reply
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- return
- for i in range(9):
- if set(r[i] for r in parsed) != set(range(1, 10)):
- yield "Sorry, that solution isn't valid!", reply
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- return
- for i in range(0, 9, 3):
- for j in range(0, 9, 3):
- if set(x for r in parsed[i:i+3] for x in r[j:j+3]) != set(range(1, 10)):
- yield "Sorry, that solution isn't valid!", reply
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- return
- zeroes = len([y for x in state.mining_puzzle for y in x if y == 0])
- difficulty = (zeroes - 10) / 4
- value = round(abs(random.gauss(difficulty * 200, 25)), 2)
- sender_wallet.balance += value
- await wallet_store.save(sender_id, sender_wallet)
-
- # allow generate_puzzle to save the state
- state.treasury += value / 10
- await generate_puzzle()
- yield f"Looks right to me! You earned {value} RollCoins (and generated some for the treasury). That brings your balance to {sender_wallet.balance}", reply
- yield f"The current mining challenge is\n{Sudoku(board=state.mining_puzzle)}"
- @as_command
- async def appraise(
- logger: Logger,
- origin: Origin,
- attachments: Attachments,
- sender_id: Sender,
- sender_wallet: SenderWallet,
- wallet_store: Data(RollcoinWallet),
- state: State,
- state_store: Data(RollcoinState),
- reply: Reply,
- ):
- """
- Use this command while replying to a popular post, and I will purchase it for some RollCoins!
- """
- if origin == "GROUPME":
- try:
- reply_attachment = next(a for a in attachments if a.name == "reply")
- target_msg = json.loads(await reply_attachment.body())
- appraisal_id = f"GROUPME-{target_msg['group_id']}-{target_msg['id']}"
- score_base = len(target_msg["favorited_by"])
- except:
- logger.exception("Failed appraisal, logging for debugging")
- RollbotFailure.INVALID_ARGUMENTS.raise_exc("Reply to a message to have it appraised, I could not read that one")
- # other origins an be handled here
- else:
- RollbotFailure.INVALID_COMMAND.raise_exc(f"Message appraisal is not implemented in this platform (origin was {origin})")
- if appraisal_id in state.appraised:
- return "Sorry, I've already purchased that message!", reply
- if score_base <= 2:
- return "Sorry, I don't think that message is worth very much...", reply
- value = round(abs(random.gauss(score_base * 50, 15)), 2)
- sender_wallet.balance += value
- await wallet_store.save(sender_id, sender_wallet)
- state.appraised.append(appraisal_id)
- await state_store.save(GLOBAL_STATE_KEY, state)
- return f"That post is very interesting! I will purchase it for {value} RollCoins! That brings your balance to {sender_wallet.balance}", reply
- def pull_gacha_color():
- pull = random.randint(1, 100)
- if pull == 100: # 1%
- return NFT_ULTRA
- if pull >= 90: # 10%
- return NFT_RARE
- if pull >= 70: # 20%
- return NFT_UNCOMMON
- return random.choice(NFT_COLORS) # nice%
- @as_command
- async def gacha(
- sender_wallet: SenderWallet,
- sender_id: Sender,
- wallet_store: Data(RollcoinWallet),
- logger: Logger,
- req: Request,
- name_api_key: NameAPIKey,
- reply: Reply,
- ):
- """
- Spend one RollCoin on a Non-Functional Tamagotchi! You could get a rare one!
- """
- if sender_wallet.balance < 1:
- return f"You only have {sender_wallet.balance} RollCoins available, and gacha pulls cost one each!", reply
- color1 = pull_gacha_color()
- color2 = pull_gacha_color()
- try:
- async with req.get(NAME_URL, headers={"X-Api-Key": name_api_key}) as res:
- name = (await res.json())[0]
- async with req.get(ICON_URL + f"&primaryColor={color1}&secondaryColor={color2}") as res:
- img = Attachment(name="image", body=await res.read())
- except:
- logger.exception("Failed gacha, logging for debugging")
- RollbotFailure.SERVICE_DOWN.raise_exc("Failed to pull from gachapon, your coin has not been deducted")
- # calculate rarity
- rarity = (NFT_RARITY_LOOKUP.get(color1, 0) + NFT_RARITY_LOOKUP.get(color2, 0)) // 2
- if color1 == color2:
- color_info = f"double {color1}!"
- rarity += 1
- else:
- color_info = f"{color1} and {color2}"
- rarity_info = "legendary!" if rarity >= len(NFT_RARITY) else NFT_RARITY[rarity]
- info = f"{name} ({color_info}) ({rarity_info})"
- sender_wallet.balance -= 1
- sender_wallet.nfts.append(info)
- await wallet_store.save(sender_id, sender_wallet)
-
- return f"You received...\n\t{info}", img, reply
- # ADMIN COMMANDS
- @as_command
- async def deflate(
- origin_admin: OriginAdmin,
- power: Arg(0, convert=int, missing_msg="Must provide power to deflate by", fail_msg="Power to deflate by must be an integer"),
- wallet_store: Data(RollcoinWallet),
- state_store: Data(RollcoinState),
- ):
- """
- Admins only: Deflate the value of the RollCoin by a factor of 10 to the given power, which can be negative to inflate.
- """
- if not origin_admin:
- RollbotFailure.PERMISSIONS.raise_exc("Only admins can deflate the currency")
- factor = 10 ** power
- state = await state_store.load(GLOBAL_STATE_KEY)
- state.treasury /= factor
- await state_store.save(GLOBAL_STATE_KEY, state)
- async for (wallet_id, wallet) in wallet_store.all():
- wallet.balance /= factor
- wallet.holdings /= factor
- wallet.cost_basis /= factor
- await wallet_store.save(wallet_id, wallet)
- return f"Economy deflated by a factor of 10^{power}"
- @as_command
- async def brrr(
- origin_admin: OriginAdmin,
- coins: Arg(0, convert=float, missing_msg="Must provide coins to mint", fail_msg="Coins to mint by must be a number"),
- state_store: Data(RollcoinState),
- ):
- """
- Admins only: Print more RollCoins into the treasury, which can be useful to help the economy! You can also delete them with a negative number
- """
- if not origin_admin:
- RollbotFailure.PERMISSIONS.raise_exc("Only admins can mint coins")
- if coins <= 0:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc("Can only mint a positive number of coins")
- state = await state_store.load(GLOBAL_STATE_KEY)
- state.treasury += coins
- await state_store.save(GLOBAL_STATE_KEY, state)
- return f"{coins} new RollCoins have been added to the treasury!"
- @as_command
- async def tax(
- origin_admin: OriginAdmin,
- wallet_lookup: WalletLookup,
- target_name: Arg(0, missing_msg="Must provide a target to tax"),
- coins: Arg(1, convert=float, missing_msg="Must provide coins to tax", fail_msg="Coins to tax by must be a number"),
- wallet_store: Data(RollcoinWallet),
- state_store: Data(RollcoinState),
- ):
- """
- Admins only: Tax a user to put some of their RollCoins into the treasury, or use a negative number to provide a rebate
- """
- if not origin_admin:
- RollbotFailure.PERMISSIONS.raise_exc("Only admins can tax someone")
- if (target_id := wallet_lookup.get(target_name.lower(), None)) is None or (wallet := await wallet_store.load(target_id)) is None:
- RollbotFailure.INVALID_ARGUMENTS.raise_exc(f"Could not find a wallet for {target_name}")
- state = await state_store.load(GLOBAL_STATE_KEY)
- state.treasury += coins
- wallet.balance -= coins
- await state_store.save(GLOBAL_STATE_KEY, state)
- await wallet_store.save(target_id, wallet)
- return f"{coins} RollCoins were taken from {target_name} and put in the treasury!"
- @as_command
- async def market(
- origin_admin: OriginAdmin,
- target_state: Arg(0, convert=int, missing_msg="Must provide target state for market", fail_msg="Target market state must be an integer"),
- state_store: Data(RollcoinState),
- messages: MarketMessages,
- ):
- """
- Admins only: Force the market into a certain state
- """
- if not origin_admin:
- RollbotFailure.PERMISSIONS.raise_exc("Only admins can manipulate the market")
- if target_state < 0 or target_state >= len(messages):
- RollbotFailure.INVALID_ARGUMENTS.raise_exc("Market state invalid")
- state = await state_store.load(GLOBAL_STATE_KEY)
- state.market_state = target_state
- await state_store.save(GLOBAL_STATE_KEY, state)
- return f"Market status: {messages[state.market_state]}"
|