#!/usr/bin/env python3 import json import os import time import requests from dotenv import load_dotenv from datetime import datetime, timezone, timedelta # Load environment variables load_dotenv() WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL') API_KEY = os.getenv('API_KEY') API_BASE_URL = 'https://api.donutsmp.net/v1/stats' LOOKUP_BASE_URL = 'https://api.donutsmp.net/v1/lookup' MESSAGE_ID_FILE = 'message_id.txt' CACHE_FILE = 'stats_cache.json' REFRESH_INTERVAL = 60 # 1 minute in seconds def sleep_until_next_interval(): """Sleep until the next even 1-minute mark on the clock (e.g. :00, :01, :02...)""" now = datetime.now() # Seconds elapsed in the current 1-minute block elapsed = (now.minute % 1) * 60 + now.second wait = REFRESH_INTERVAL - elapsed next_tick = now + timedelta(seconds=wait) print(f"šŸ’¤ Sleeping {wait}s until next update at {next_tick.strftime('%H:%M:%S')}...") time.sleep(wait) def load_players(): """Load player list from JSON file""" with open('players.json', 'r') as f: data = json.load(f) return data.get('players', []) def fetch_player_stats(username): """Fetch stats for a single player from the API""" try: url = f'{API_BASE_URL}/{username}' headers = {'Authorization': API_KEY} if API_KEY else {} response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() return data.get('result', None) elif response.status_code == 500: return None # Player doesn't exist else: print(f"Error fetching stats for {username}: {response.status_code}") return None except Exception as e: print(f"Exception fetching stats for {username}: {e}") return None def fetch_player_presence(username): """Fetch online/location presence data for a single player from the lookup API""" try: url = f'{LOOKUP_BASE_URL}/{username}' headers = { 'accept': 'application/json' } if API_KEY: headers['Authorization'] = API_KEY response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() result = data.get('result', {}) return { 'online': True, 'location': result.get('location', 'Unknown'), 'rank': result.get('rank', 'Unknown') } if response.status_code == 500: # API returns 500 with this specific message when user is offline. try: data = response.json() if data.get('message') == 'This user is not currently online.': return { 'online': False, 'location': 'Offline', 'rank': 'Unknown' } except Exception: pass if response.status_code == 401: print(f"Error fetching presence for {username}: Unauthorized (check API_KEY)") else: print(f"Error fetching presence for {username}: {response.status_code}") return { 'online': False, 'location': 'Offline', 'rank': 'Unknown' } except Exception as e: print(f"Exception fetching presence for {username}: {e}") return { 'online': False, 'location': 'Offline', 'rank': 'Unknown' } def get_presence_icon(location, online): """Map DonutSMP location to Discord emoji state""" if not online: return '⚫' normalized = str(location).strip().lower() if normalized in {'limbo', 'limbos'}: return 'šŸ’¤' if normalized == 'spawn': return '🟔' if normalized in {'overworld', 'nether', 'end'}: return '🟢' return '🟢' def get_presence_shortcode(location, online): """Map DonutSMP location to Discord shortcode emoji for table rendering.""" if not online: return ':black_circle:' normalized = str(location).strip().lower() if normalized in {'limbo', 'limbos'}: return ':zzz:' if normalized == 'spawn': return ':yellow_circle:' if normalized in {'overworld', 'nether', 'end'}: return ':green_circle:' return ':green_circle:' def format_number(value): """Format a number with K/M/B suffixes for readability""" if value == 'N/A' or value == 'āŒ': return value try: # Try to convert to int first num = int(value) if num >= 1_000_000_000: return f"{num / 1_000_000_000:.2f} B" elif num >= 1_000_000: return f"{num / 1_000_000:.2f} M" elif num >= 1_000: return f"{num / 1_000:.2f} K" else: return str(num) except (ValueError, TypeError): # If it's not a plain number (like playtime with format), return as-is return str(value) def format_playtime(milliseconds): """Format playtime from milliseconds to readable format (XdYhZm)""" if milliseconds == 'N/A' or milliseconds == 'āŒ': return milliseconds try: # Convert milliseconds to seconds total_seconds = int(milliseconds) // 1000 days = total_seconds // 86400 hours = (total_seconds % 86400) // 3600 minutes = (total_seconds % 3600) // 60 parts = [] if days > 0: parts.append(f"{days}d") if hours > 0: parts.append(f"{hours}h") if minutes > 0: parts.append(f"{minutes}m") return " ".join(parts) if parts else "0m" except (ValueError, TypeError): return str(milliseconds) def format_stats_message(players_data, presence_data): """Format player stats as Discord embed fields (one field per column, inline).""" col_players = [] col_money = [] col_shards = [] total_money = 0 total_spawners = 0 for username, stats in players_data.items(): presence = presence_data.get(username, {'online': False, 'location': 'Offline'}) status = get_presence_shortcode(presence.get('location', 'Offline'), presence.get('online', False)) if stats: raw_money = stats.get('money', 0) or 0 raw_shards = stats.get('shards', 0) or 0 try: total_money += int(raw_money) spawners = int(raw_shards) // 1500 total_spawners += spawners except (ValueError, TypeError): spawners = 0 money = format_number(raw_money) shards_display = f"{format_number(raw_shards)} ({spawners} <:spawner:1411753406070263958> )" else: money = shards_display = 'āŒ' col_players.append(f"{status} {username}") col_money.append(money) col_shards.append(shards_display) # Append totals after a divider line col_players.append("─────") col_money.append(f"**{format_number(total_money)}**") col_shards.append(f"**{total_spawners} <:spawner:1411753406070263958>**") def field(name, values, inline=True): return {"name": name, "value": "\n".join(values), "inline": inline} embed = { "title": "šŸ“Š Server Stats Tracker", "color": 3447003, "timestamp": datetime.now(timezone.utc).isoformat(), "fields": [ field(":green_circle: Player", col_players), field(":moneybag: Money", col_money), field(":crystal_ball: Shards", col_shards), ] } return {"embeds": [embed]} def get_saved_message_id(): """Retrieve the saved Discord message ID""" if os.path.exists(MESSAGE_ID_FILE): try: with open(MESSAGE_ID_FILE, 'r') as f: return f.read().strip() except: return None return None def get_cached_stats(): """Retrieve the cached stats from last update""" if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r') as f: return json.load(f) except: return None return None def save_cached_stats(players_data): """Save the current stats to cache file""" with open(CACHE_FILE, 'w') as f: json.dump(players_data, f) def stats_changed(old_data, new_data): """Check if stats have changed between updates""" if old_data is None: print(" ā„¹ļø First run - no cached stats") return True # First run # Backward compatibility for old cache format. if not isinstance(old_data, dict) or 'stats' not in old_data or 'presence' not in old_data: print(" ā„¹ļø Cache format changed") return True if old_data != new_data: print(" ā„¹ļø Stats or presence changed") return True return False def save_message_id(message_id): """Save the Discord message ID for future edits""" with open(MESSAGE_ID_FILE, 'w') as f: f.write(str(message_id)) def post_message(content): """Post a new message to Discord webhook""" try: # Use wait=true to get the message ID in response url = f"{WEBHOOK_URL}?wait=true" response = requests.post(url, json=content, timeout=10) if response.status_code in [200, 204]: # Discord returns the message data in JSON try: message_data = response.json() message_id = message_data.get('id') if message_id: save_message_id(message_id) print(f"āœ… Posted new message: {message_id}") else: print("āœ… Posted new message (no ID returned)") return True except Exception as e: print(f"āœ… Posted new message (couldn't parse ID: {e})") return True else: print(f"āŒ Error posting message: {response.status_code}") print(response.text) return False except Exception as e: print(f"āŒ Exception posting message: {e}") return False def edit_message(message_id, content): """Edit an existing Discord message. Returns: True on success, "not_found" when Discord reports Unknown Message (10008), False on other errors. """ try: # Extract webhook parts webhook_parts = WEBHOOK_URL.rstrip('/').split('/') if len(webhook_parts) < 2: print("āŒ Invalid webhook URL format") return False webhook_id = webhook_parts[-2] webhook_token = webhook_parts[-1] edit_url = f"https://discord.com/api/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}" response = requests.patch(edit_url, json=content, timeout=10) if response.status_code == 200: print(f"āœ… Updated message: {message_id}") return True else: if response.status_code == 404: try: err = response.json() if err.get('code') == 10008: print(f"ā„¹ļø Stored message {message_id} no longer exists, will post a new message") return "not_found" except Exception: pass print(f"āŒ Error editing message: {response.status_code}") print(response.text) return False except Exception as e: print(f"āŒ Exception editing message: {e}") return False def main(): """Main loop to fetch and update stats""" if not WEBHOOK_URL: print("āŒ Error: DISCORD_WEBHOOK_URL not set in .env file") return print("šŸš€ Starting stats tracker...") print(f"šŸ“Ø Webhook: {WEBHOOK_URL[:50]}...") print(f"ā±ļø Refresh interval: {REFRESH_INTERVAL} seconds") # Load players players = load_players() if not players: print("āŒ No players found in players.json") return print(f"šŸ‘„ Tracking {len(players)} players: {', '.join(players)}") print("-" * 50) while True: try: # Fetch stats for all players print(f"\nā° [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Fetching stats...") players_data = {} presence_data = {} for player in players: stats = fetch_player_stats(player) presence = fetch_player_presence(player) players_data[player] = stats presence_data[player] = presence status_icon = get_presence_icon(presence.get('location', 'Offline'), presence.get('online', False)) if stats: print(f" āœ… {player} {status_icon} {presence.get('location', 'Offline')}") else: print(f" āš ļø {player} {status_icon} {presence.get('location', 'Offline')}") current_snapshot = { 'stats': players_data, 'presence': presence_data } # Check if stats changed cached_stats = get_cached_stats() has_changed = stats_changed(cached_stats, current_snapshot) if not has_changed: print("šŸ“ No changes detected, skipping update") sleep_until_next_interval() continue # Format message message_content = format_stats_message(players_data, presence_data) # Check if we have a previous message ID message_id = get_saved_message_id() if message_id: # Edit existing message print(f"šŸ“ Editing existing message: {message_id}") edit_result = edit_message(message_id, message_content) if edit_result == "not_found": print("šŸ“® Posting replacement message") post_message(message_content) else: # Post new message print("šŸ“® No message ID found, posting new message") post_message(message_content) # Save stats for next comparison save_cached_stats(current_snapshot) print("šŸ’¾ Cached stats saved") # Wait for next update sleep_until_next_interval() except KeyboardInterrupt: print("\n\nšŸ‘‹ Shutting down stats tracker...") break except Exception as e: print(f"āŒ Unexpected error: {e}") print(f"šŸ’¤ Retrying in {REFRESH_INTERVAL}s...") time.sleep(REFRESH_INTERVAL) if __name__ == '__main__': main()