stats-tracker/stats.py

439 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import json
import os
import time
import requests
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL')
API_KEY = os.getenv('API_KEY')
API_BASE_URL = 'https://api.donutsmp.net/v1/stats'
LOOKUP_BASE_URL = 'https://api.donutsmp.net/v1/lookup'
MESSAGE_ID_FILE = 'message_id.txt'
CACHE_FILE = 'stats_cache.json'
REFRESH_INTERVAL = 60 # 1 minute in seconds
def sleep_until_next_interval():
"""Sleep until the next even 1-minute mark on the clock (e.g. :00, :01, :02...)"""
now = datetime.now()
# Seconds elapsed in the current 1-minute block
elapsed = (now.minute % 1) * 60 + now.second
wait = REFRESH_INTERVAL - elapsed
next_tick = now + timedelta(seconds=wait)
print(f"💤 Sleeping {wait}s until next update at {next_tick.strftime('%H:%M:%S')}...")
time.sleep(wait)
def load_players():
"""Load player list from JSON file"""
with open('players.json', 'r') as f:
data = json.load(f)
return data.get('players', [])
def fetch_player_stats(username):
"""Fetch stats for a single player from the API"""
try:
url = f'{API_BASE_URL}/{username}'
headers = {'Authorization': API_KEY} if API_KEY else {}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get('result', None)
elif response.status_code == 500:
return None # Player doesn't exist
else:
print(f"Error fetching stats for {username}: {response.status_code}")
return None
except Exception as e:
print(f"Exception fetching stats for {username}: {e}")
return None
def fetch_player_presence(username):
"""Fetch online/location presence data for a single player from the lookup API"""
try:
url = f'{LOOKUP_BASE_URL}/{username}'
headers = {
'accept': 'application/json'
}
if API_KEY:
headers['Authorization'] = API_KEY
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
result = data.get('result', {})
return {
'online': True,
'location': result.get('location', 'Unknown'),
'rank': result.get('rank', 'Unknown')
}
if response.status_code == 500:
# API returns 500 with this specific message when user is offline.
try:
data = response.json()
if data.get('message') == 'This user is not currently online.':
return {
'online': False,
'location': 'Offline',
'rank': 'Unknown'
}
except Exception:
pass
if response.status_code == 401:
print(f"Error fetching presence for {username}: Unauthorized (check API_KEY)")
else:
print(f"Error fetching presence for {username}: {response.status_code}")
return {
'online': False,
'location': 'Offline',
'rank': 'Unknown'
}
except Exception as e:
print(f"Exception fetching presence for {username}: {e}")
return {
'online': False,
'location': 'Offline',
'rank': 'Unknown'
}
def get_presence_icon(location, online):
"""Map DonutSMP location to Discord emoji state"""
if not online:
return ''
normalized = str(location).strip().lower()
if normalized in {'limbo', 'limbos'}:
return '💤'
if normalized == 'spawn':
return '🟡'
if normalized in {'overworld', 'nether', 'end'}:
return '🟢'
return '🟢'
def get_presence_shortcode(location, online):
"""Map DonutSMP location to Discord shortcode emoji for table rendering."""
if not online:
return ':black_circle:'
normalized = str(location).strip().lower()
if normalized in {'limbo', 'limbos'}:
return ':zzz:'
if normalized == 'spawn':
return ':yellow_circle:'
if normalized in {'overworld', 'nether', 'end'}:
return ':green_circle:'
return ':green_circle:'
def format_number(value):
"""Format a number with K/M/B suffixes for readability"""
if value == 'N/A' or value == '':
return value
try:
# Try to convert to int first
num = int(value)
if num >= 1_000_000_000:
return f"{num / 1_000_000_000:.2f} B"
elif num >= 1_000_000:
return f"{num / 1_000_000:.2f} M"
elif num >= 1_000:
return f"{num / 1_000:.2f} K"
else:
return str(num)
except (ValueError, TypeError):
# If it's not a plain number (like playtime with format), return as-is
return str(value)
def format_playtime(milliseconds):
"""Format playtime from milliseconds to readable format (XdYhZm)"""
if milliseconds == 'N/A' or milliseconds == '':
return milliseconds
try:
# Convert milliseconds to seconds
total_seconds = int(milliseconds) // 1000
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
return " ".join(parts) if parts else "0m"
except (ValueError, TypeError):
return str(milliseconds)
def format_stats_message(players_data, presence_data):
"""Format player stats as Discord embed fields (one field per column, inline)."""
col_players = []
col_money = []
col_shards = []
total_money = 0
total_spawners = 0
for username, stats in players_data.items():
presence = presence_data.get(username, {'online': False, 'location': 'Offline'})
status = get_presence_shortcode(presence.get('location', 'Offline'), presence.get('online', False))
if stats:
raw_money = stats.get('money', 0) or 0
raw_shards = stats.get('shards', 0) or 0
try:
total_money += int(raw_money)
spawners = int(raw_shards) // 1500
total_spawners += spawners
except (ValueError, TypeError):
spawners = 0
money = format_number(raw_money)
shards_display = f"{format_number(raw_shards)} ({spawners} <:spawner:1411753406070263958> )"
else:
money = shards_display = ''
col_players.append(f"{status} {username}")
col_money.append(money)
col_shards.append(shards_display)
# Append totals after a divider line
col_players.append("─────")
col_money.append(f"**{format_number(total_money)}**")
col_shards.append(f"**{total_spawners} <:spawner:1411753406070263958>**")
def field(name, values, inline=True):
return {"name": name, "value": "\n".join(values), "inline": inline}
embed = {
"title": "📊 Server Stats Tracker",
"color": 3447003,
"timestamp": datetime.now(timezone.utc).isoformat(),
"fields": [
field(":green_circle: Player", col_players),
field(":moneybag: Money", col_money),
field(":crystal_ball: Shards", col_shards),
]
}
return {"embeds": [embed]}
def get_saved_message_id():
"""Retrieve the saved Discord message ID"""
if os.path.exists(MESSAGE_ID_FILE):
try:
with open(MESSAGE_ID_FILE, 'r') as f:
return f.read().strip()
except:
return None
return None
def get_cached_stats():
"""Retrieve the cached stats from last update"""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r') as f:
return json.load(f)
except:
return None
return None
def save_cached_stats(players_data):
"""Save the current stats to cache file"""
with open(CACHE_FILE, 'w') as f:
json.dump(players_data, f)
def stats_changed(old_data, new_data):
"""Check if stats have changed between updates"""
if old_data is None:
print(" First run - no cached stats")
return True # First run
# Backward compatibility for old cache format.
if not isinstance(old_data, dict) or 'stats' not in old_data or 'presence' not in old_data:
print(" Cache format changed")
return True
if old_data != new_data:
print(" Stats or presence changed")
return True
return False
def save_message_id(message_id):
"""Save the Discord message ID for future edits"""
with open(MESSAGE_ID_FILE, 'w') as f:
f.write(str(message_id))
def post_message(content):
"""Post a new message to Discord webhook"""
try:
# Use wait=true to get the message ID in response
url = f"{WEBHOOK_URL}?wait=true"
response = requests.post(url, json=content, timeout=10)
if response.status_code in [200, 204]:
# Discord returns the message data in JSON
try:
message_data = response.json()
message_id = message_data.get('id')
if message_id:
save_message_id(message_id)
print(f"✅ Posted new message: {message_id}")
else:
print("✅ Posted new message (no ID returned)")
return True
except Exception as e:
print(f"✅ Posted new message (couldn't parse ID: {e})")
return True
else:
print(f"❌ Error posting message: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"❌ Exception posting message: {e}")
return False
def edit_message(message_id, content):
"""Edit an existing Discord message.
Returns:
True on success,
"not_found" when Discord reports Unknown Message (10008),
False on other errors.
"""
try:
# Extract webhook parts
webhook_parts = WEBHOOK_URL.rstrip('/').split('/')
if len(webhook_parts) < 2:
print("❌ Invalid webhook URL format")
return False
webhook_id = webhook_parts[-2]
webhook_token = webhook_parts[-1]
edit_url = f"https://discord.com/api/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}"
response = requests.patch(edit_url, json=content, timeout=10)
if response.status_code == 200:
print(f"✅ Updated message: {message_id}")
return True
else:
if response.status_code == 404:
try:
err = response.json()
if err.get('code') == 10008:
print(f" Stored message {message_id} no longer exists, will post a new message")
return "not_found"
except Exception:
pass
print(f"❌ Error editing message: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"❌ Exception editing message: {e}")
return False
def main():
"""Main loop to fetch and update stats"""
if not WEBHOOK_URL:
print("❌ Error: DISCORD_WEBHOOK_URL not set in .env file")
return
print("🚀 Starting stats tracker...")
print(f"📨 Webhook: {WEBHOOK_URL[:50]}...")
print(f"⏱️ Refresh interval: {REFRESH_INTERVAL} seconds")
# Load players
players = load_players()
if not players:
print("❌ No players found in players.json")
return
print(f"👥 Tracking {len(players)} players: {', '.join(players)}")
print("-" * 50)
while True:
try:
# Fetch stats for all players
print(f"\n⏰ [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Fetching stats...")
players_data = {}
presence_data = {}
for player in players:
stats = fetch_player_stats(player)
presence = fetch_player_presence(player)
players_data[player] = stats
presence_data[player] = presence
status_icon = get_presence_icon(presence.get('location', 'Offline'), presence.get('online', False))
if stats:
print(f"{player} {status_icon} {presence.get('location', 'Offline')}")
else:
print(f" ⚠️ {player} {status_icon} {presence.get('location', 'Offline')}")
current_snapshot = {
'stats': players_data,
'presence': presence_data
}
# Check if stats changed
cached_stats = get_cached_stats()
has_changed = stats_changed(cached_stats, current_snapshot)
if not has_changed:
print("📍 No changes detected, skipping update")
sleep_until_next_interval()
continue
# Format message
message_content = format_stats_message(players_data, presence_data)
# Check if we have a previous message ID
message_id = get_saved_message_id()
if message_id:
# Edit existing message
print(f"📝 Editing existing message: {message_id}")
edit_result = edit_message(message_id, message_content)
if edit_result == "not_found":
print("📮 Posting replacement message")
post_message(message_content)
else:
# Post new message
print("📮 No message ID found, posting new message")
post_message(message_content)
# Save stats for next comparison
save_cached_stats(current_snapshot)
print("💾 Cached stats saved")
# Wait for next update
sleep_until_next_interval()
except KeyboardInterrupt:
print("\n\n👋 Shutting down stats tracker...")
break
except Exception as e:
print(f"❌ Unexpected error: {e}")
print(f"💤 Retrying in {REFRESH_INTERVAL}s...")
time.sleep(REFRESH_INTERVAL)
if __name__ == '__main__':
main()