123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import uuid
- from typing import List, Tuple
- from sqlalchemy.orm import Session
- from .models import Game, Coordinate, Player, Guess
- from .. import schemas
- def create_game(db: Session, conf: schemas.GameConfig, coords: List[Tuple[float, float]]) -> str:
- if len(coords) != conf.rounds:
- raise ValueError("Insufficient number of coordinates")
-
- game_id = str(uuid.uuid4())
- while db.query(Game).get(game_id) is not None:
- # basically impossible collision, but let's be safe
- game_id = str(uuid.uuid4())
-
- new_game = Game(
- game_id=game_id,
- timer=conf.timer,
- rounds=conf.rounds,
- only_america=conf.only_america,
- generation_method=conf.generation_method,
- rule_set=conf.rule_set,
- )
- db.add(new_game)
- db.add_all([Coordinate(
- game_id=game_id,
- round_number=round_num + 1,
- latitude=lat,
- longitude=lng,
- ) for (round_num, (lat, lng)) in enumerate(coords)])
- db.commit()
- return game_id
- def get_game(db: Session, game_id: str) -> Game:
- return db.query(Game).get(game_id)
- def join_game(db: Session, game_id: str, player_name: str) -> str:
- existing = db.query(Player).filter(Player.game_id == game_id, Player.player_name == player_name).first()
- if existing is not None:
- return None
- new_player = Player(game_id=game_id, player_name=player_name)
- db.add(new_player)
- db.commit()
- return str(new_player.player_id)
- def link_game(db: Session, game_id: str, linked_game: str):
- db.query(Game).get(game_id).linked_game = linked_game
- db.commit()
- def get_player(db: Session, player_id: str) -> Player:
- return db.query(Player).get(int(player_id))
- def get_total_score(player: Player) -> int:
- return sum(g.round_score or 0 for g in player.guesses)
- def get_next_round_number(player: Player) -> int:
- if len(player.guesses) == 0:
- return 1
- next_round = player.guesses[-1].round_number + 1
- if next_round <= player.game.rounds:
- return next_round
- return None
- def get_coordinate(db: Session, game_id: str, round_number: int) -> Coordinate:
- return db.query(Coordinate).get((game_id, round_number))
- def get_next_coordinate(db: Session, player: Player) -> Coordinate:
- round_number = get_next_round_number(player)
- if round_number is None:
- return None
- return get_coordinate(db, player.game_id, round_number)
- def get_next_round_time(player: Player) -> int:
- if player.game.rule_set == schemas.RuleSetEnum.time_bank:
- if len(player.guesses) == 0:
- return player.game.timer * player.game.rounds
- return player.guesses[-1].time_remaining
- return player.game.timer
- def add_guess(db: Session, guess: schemas.Guess, player: Player, round_number: int, score: int) -> bool:
- existing = db.query(Guess).filter(Guess.player_id == player.player_id, Guess.round_number == round_number).first()
- if existing is not None:
- return False
- g = Guess(
- player_id=player.player_id,
- round_number=round_number,
- latitude=guess.lat,
- longitude=guess.lng,
- round_score=score,
- time_remaining=guess.time_remaining,
- )
- db.add(g)
- db.commit()
- if guess.time_remaining <= 0 and player.game.rule_set == schemas.RuleSetEnum.time_bank:
- for r in range(round_number, player.game.rounds):
- add_timeout(db, player, r + 1)
- return True
- def add_timeout(db: Session, player: Player, round_number: int) -> bool:
- return add_guess(db, schemas.Guess(lat=0, lng=0, time_remaining=0), player, round_number, None)
- def get_first_submitter(db: Session, game_id: str, round_number: int) -> str:
- first = db.query(Player.player_name, Guess).filter(
- Player.game_id == game_id,
- Guess.round_number == round_number,
- Player.player_id == Guess.player_id,
- ).order_by(Guess.created_at.desc()).first()
- if first is None:
- return None
- return first[0]
|