476 lines
16 KiB
Python
476 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
import json
|
||
import os
|
||
import time
|
||
import requests
|
||
from dotenv import load_dotenv
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL')
|
||
API_KEY = os.getenv('API_KEY')
|
||
API_BASE_URL = 'https://api.donutsmp.net/v1/stats'
|
||
LOOKUP_BASE_URL = 'https://api.donutsmp.net/v1/lookup'
|
||
MESSAGE_ID_FILE = 'message_id.txt'
|
||
CACHE_FILE = 'stats_cache.json'
|
||
REFRESH_INTERVAL = 60 # 1 minute in seconds
|
||
|
||
def sleep_until_next_interval():
|
||
"""Sleep until the next even 1-minute mark on the clock (e.g. :00, :01, :02...)"""
|
||
now = datetime.now()
|
||
# Seconds elapsed in the current 1-minute block
|
||
elapsed = (now.minute % 1) * 60 + now.second
|
||
wait = REFRESH_INTERVAL - elapsed
|
||
next_tick = now + timedelta(seconds=wait)
|
||
print(f"💤 Sleeping {wait}s until next update at {next_tick.strftime('%H:%M:%S')}...")
|
||
time.sleep(wait)
|
||
|
||
def load_players():
|
||
"""Load player list from JSON file"""
|
||
with open('players.json', 'r') as f:
|
||
data = json.load(f)
|
||
return data.get('players', [])
|
||
|
||
def fetch_player_stats(username):
|
||
"""Fetch stats for a single player from the API"""
|
||
try:
|
||
url = f'{API_BASE_URL}/{username}'
|
||
headers = {'Authorization': API_KEY} if API_KEY else {}
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data.get('result', None)
|
||
elif response.status_code == 500:
|
||
return None # Player doesn't exist
|
||
else:
|
||
print(f"Error fetching stats for {username}: {response.status_code}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"Exception fetching stats for {username}: {e}")
|
||
return None
|
||
|
||
def fetch_player_presence(username):
|
||
"""Fetch online/location presence data for a single player from the lookup API"""
|
||
try:
|
||
url = f'{LOOKUP_BASE_URL}/{username}'
|
||
headers = {
|
||
'accept': 'application/json'
|
||
}
|
||
if API_KEY:
|
||
headers['Authorization'] = API_KEY
|
||
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
result = data.get('result', {})
|
||
return {
|
||
'online': True,
|
||
'location': result.get('location', 'Unknown'),
|
||
'rank': result.get('rank', 'Unknown')
|
||
}
|
||
|
||
if response.status_code == 500:
|
||
# API returns 500 with this specific message when user is offline.
|
||
try:
|
||
data = response.json()
|
||
if data.get('message') == 'This user is not currently online.':
|
||
return {
|
||
'online': False,
|
||
'location': 'Offline',
|
||
'rank': 'Unknown'
|
||
}
|
||
except Exception:
|
||
pass
|
||
|
||
if response.status_code == 401:
|
||
print(f"Error fetching presence for {username}: Unauthorized (check API_KEY)")
|
||
else:
|
||
print(f"Error fetching presence for {username}: {response.status_code}")
|
||
|
||
return {
|
||
'online': False,
|
||
'location': 'Offline',
|
||
'rank': 'Unknown'
|
||
}
|
||
except Exception as e:
|
||
print(f"Exception fetching presence for {username}: {e}")
|
||
return {
|
||
'online': False,
|
||
'location': 'Offline',
|
||
'rank': 'Unknown'
|
||
}
|
||
|
||
def get_presence_icon(location, online):
|
||
"""Map DonutSMP location to Discord emoji state"""
|
||
if not online:
|
||
return '⚫'
|
||
|
||
normalized = str(location).strip().lower()
|
||
if normalized in {'limbo', 'limbos'}:
|
||
return '💤'
|
||
if normalized == 'spawn':
|
||
return '🟡'
|
||
if normalized in {'overworld', 'nether', 'end'}:
|
||
return '🟢'
|
||
|
||
return '🟢'
|
||
|
||
def get_presence_shortcode(location, online):
|
||
"""Map DonutSMP location to Discord shortcode emoji for table rendering."""
|
||
if not online:
|
||
return ':black_circle:'
|
||
|
||
normalized = str(location).strip().lower()
|
||
if normalized in {'limbo', 'limbos'}:
|
||
return ':zzz:'
|
||
if normalized == 'spawn':
|
||
return ':yellow_circle:'
|
||
if normalized in {'overworld', 'nether', 'end'}:
|
||
return ':green_circle:'
|
||
|
||
return ':green_circle:'
|
||
|
||
def format_number(value):
|
||
"""Format a number with K/M/B suffixes for readability"""
|
||
if value == 'N/A' or value == '❌':
|
||
return value
|
||
|
||
try:
|
||
# Try to convert to int first
|
||
num = int(value)
|
||
|
||
if num >= 1_000_000_000:
|
||
return f"{num / 1_000_000_000:.2f} B"
|
||
elif num >= 1_000_000:
|
||
return f"{num / 1_000_000:.2f} M"
|
||
elif num >= 1_000:
|
||
return f"{num / 1_000:.2f} K"
|
||
else:
|
||
return str(num)
|
||
except (ValueError, TypeError):
|
||
# If it's not a plain number (like playtime with format), return as-is
|
||
return str(value)
|
||
|
||
def format_playtime(milliseconds):
|
||
"""Format playtime from milliseconds to readable format (XdYhZm)"""
|
||
if milliseconds == 'N/A' or milliseconds == '❌':
|
||
return milliseconds
|
||
|
||
try:
|
||
# Convert milliseconds to seconds
|
||
total_seconds = int(milliseconds) // 1000
|
||
|
||
days = total_seconds // 86400
|
||
hours = (total_seconds % 86400) // 3600
|
||
minutes = (total_seconds % 3600) // 60
|
||
|
||
parts = []
|
||
if days > 0:
|
||
parts.append(f"{days}d")
|
||
if hours > 0:
|
||
parts.append(f"{hours}h")
|
||
if minutes > 0:
|
||
parts.append(f"{minutes}m")
|
||
|
||
return " ".join(parts) if parts else "0m"
|
||
except (ValueError, TypeError):
|
||
return str(milliseconds)
|
||
|
||
def format_minutes_to_spawner(minutes):
|
||
"""Format minutes to readable format (XdYhZm)"""
|
||
try:
|
||
total_minutes = int(minutes)
|
||
days = total_minutes // 1440
|
||
hours = (total_minutes % 1440) // 60
|
||
mins = total_minutes % 60
|
||
|
||
parts = []
|
||
if days > 0:
|
||
parts.append(f"{days}d")
|
||
if hours > 0:
|
||
parts.append(f"{hours}h")
|
||
if mins > 0:
|
||
parts.append(f"{mins}m")
|
||
|
||
return " ".join(parts) if parts else "0m"
|
||
except (ValueError, TypeError):
|
||
return "N/A"
|
||
|
||
def format_stats_message(players_data, presence_data):
|
||
"""Format player stats as Discord embed fields (one field per column, inline)."""
|
||
|
||
col_players = []
|
||
col_money = []
|
||
col_shards = []
|
||
|
||
total_money = 0
|
||
total_spawners = 0
|
||
total_shards = 0
|
||
min_time_to_next = float('inf')
|
||
|
||
for username, stats in players_data.items():
|
||
presence = presence_data.get(username, {'online': False, 'location': 'Offline'})
|
||
status = get_presence_shortcode(presence.get('location', 'Offline'), presence.get('online', False))
|
||
|
||
if stats:
|
||
raw_money = stats.get('money', 0) or 0
|
||
raw_shards = stats.get('shards', 0) or 0
|
||
raw_shards_int = int(raw_shards) if raw_shards else 0
|
||
try:
|
||
total_money += int(raw_money)
|
||
total_shards += raw_shards_int
|
||
spawners = raw_shards_int // 1500
|
||
total_spawners += spawners
|
||
except (ValueError, TypeError):
|
||
spawners = 0
|
||
raw_shards_int = 0
|
||
|
||
# Calculate time to next spawner
|
||
shards_in_progress = raw_shards_int % 1500
|
||
shards_to_next = 1500 - shards_in_progress
|
||
min_time_to_next = min(min_time_to_next, shards_to_next)
|
||
time_to_next_str = format_minutes_to_spawner(shards_to_next)
|
||
|
||
money = format_number(raw_money)
|
||
shards_display = f"{format_number(raw_shards)} ({spawners} <:spawner:1411753406070263958>) {time_to_next_str}"
|
||
else:
|
||
money = shards_display = '❌'
|
||
|
||
col_players.append(f"{status} {username}")
|
||
col_money.append(money)
|
||
col_shards.append(shards_display)
|
||
|
||
# Use minimum time to next spawner from all players
|
||
if min_time_to_next == float('inf'):
|
||
min_time_to_next = 1500
|
||
min_time_to_next_str = format_minutes_to_spawner(min_time_to_next)
|
||
|
||
# Append totals after a divider line
|
||
col_players.append("─────")
|
||
col_money.append(f"**{format_number(total_money)}**")
|
||
col_shards.append(f"**{total_spawners} <:spawner:1411753406070263958>** {min_time_to_next_str}")
|
||
|
||
def field(name, values, inline=True):
|
||
return {"name": name, "value": "\n".join(values), "inline": inline}
|
||
|
||
embed = {
|
||
"title": "📊 Server Stats Tracker",
|
||
"color": 3447003,
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"fields": [
|
||
field(":green_circle: Player", col_players),
|
||
field(":moneybag: Money", col_money),
|
||
field(":crystal_ball: Shards", col_shards),
|
||
]
|
||
}
|
||
|
||
return {"embeds": [embed]}
|
||
|
||
def get_saved_message_id():
|
||
"""Retrieve the saved Discord message ID"""
|
||
if os.path.exists(MESSAGE_ID_FILE):
|
||
try:
|
||
with open(MESSAGE_ID_FILE, 'r') as f:
|
||
return f.read().strip()
|
||
except:
|
||
return None
|
||
return None
|
||
|
||
|
||
def get_cached_stats():
|
||
"""Retrieve the cached stats from last update"""
|
||
if os.path.exists(CACHE_FILE):
|
||
try:
|
||
with open(CACHE_FILE, 'r') as f:
|
||
return json.load(f)
|
||
except:
|
||
return None
|
||
return None
|
||
|
||
def save_cached_stats(players_data):
|
||
"""Save the current stats to cache file"""
|
||
with open(CACHE_FILE, 'w') as f:
|
||
json.dump(players_data, f)
|
||
|
||
def stats_changed(old_data, new_data):
|
||
"""Check if stats have changed between updates"""
|
||
if old_data is None:
|
||
print(" ℹ️ First run - no cached stats")
|
||
return True # First run
|
||
|
||
# Backward compatibility for old cache format.
|
||
if not isinstance(old_data, dict) or 'stats' not in old_data or 'presence' not in old_data:
|
||
print(" ℹ️ Cache format changed")
|
||
return True
|
||
|
||
if old_data != new_data:
|
||
print(" ℹ️ Stats or presence changed")
|
||
return True
|
||
|
||
return False
|
||
def save_message_id(message_id):
|
||
"""Save the Discord message ID for future edits"""
|
||
with open(MESSAGE_ID_FILE, 'w') as f:
|
||
f.write(str(message_id))
|
||
|
||
def post_message(content):
|
||
"""Post a new message to Discord webhook"""
|
||
try:
|
||
# Use wait=true to get the message ID in response
|
||
url = f"{WEBHOOK_URL}?wait=true"
|
||
response = requests.post(url, json=content, timeout=10)
|
||
if response.status_code in [200, 204]:
|
||
# Discord returns the message data in JSON
|
||
try:
|
||
message_data = response.json()
|
||
message_id = message_data.get('id')
|
||
if message_id:
|
||
save_message_id(message_id)
|
||
print(f"✅ Posted new message: {message_id}")
|
||
else:
|
||
print("✅ Posted new message (no ID returned)")
|
||
return True
|
||
except Exception as e:
|
||
print(f"✅ Posted new message (couldn't parse ID: {e})")
|
||
return True
|
||
else:
|
||
print(f"❌ Error posting message: {response.status_code}")
|
||
print(response.text)
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Exception posting message: {e}")
|
||
return False
|
||
|
||
def edit_message(message_id, content):
|
||
"""Edit an existing Discord message.
|
||
|
||
Returns:
|
||
True on success,
|
||
"not_found" when Discord reports Unknown Message (10008),
|
||
False on other errors.
|
||
"""
|
||
try:
|
||
# Extract webhook parts
|
||
webhook_parts = WEBHOOK_URL.rstrip('/').split('/')
|
||
if len(webhook_parts) < 2:
|
||
print("❌ Invalid webhook URL format")
|
||
return False
|
||
|
||
webhook_id = webhook_parts[-2]
|
||
webhook_token = webhook_parts[-1]
|
||
|
||
edit_url = f"https://discord.com/api/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}"
|
||
response = requests.patch(edit_url, json=content, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
print(f"✅ Updated message: {message_id}")
|
||
return True
|
||
else:
|
||
if response.status_code == 404:
|
||
try:
|
||
err = response.json()
|
||
if err.get('code') == 10008:
|
||
print(f"ℹ️ Stored message {message_id} no longer exists, will post a new message")
|
||
return "not_found"
|
||
except Exception:
|
||
pass
|
||
|
||
print(f"❌ Error editing message: {response.status_code}")
|
||
print(response.text)
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Exception editing message: {e}")
|
||
return False
|
||
|
||
def main():
|
||
"""Main loop to fetch and update stats"""
|
||
if not WEBHOOK_URL:
|
||
print("❌ Error: DISCORD_WEBHOOK_URL not set in .env file")
|
||
return
|
||
|
||
print("🚀 Starting stats tracker...")
|
||
print(f"📨 Webhook: {WEBHOOK_URL[:50]}...")
|
||
print(f"⏱️ Refresh interval: {REFRESH_INTERVAL} seconds")
|
||
|
||
# Load players
|
||
players = load_players()
|
||
if not players:
|
||
print("❌ No players found in players.json")
|
||
return
|
||
|
||
print(f"👥 Tracking {len(players)} players: {', '.join(players)}")
|
||
print("-" * 50)
|
||
|
||
while True:
|
||
try:
|
||
# Fetch stats for all players
|
||
print(f"\n⏰ [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Fetching stats...")
|
||
players_data = {}
|
||
presence_data = {}
|
||
|
||
for player in players:
|
||
stats = fetch_player_stats(player)
|
||
presence = fetch_player_presence(player)
|
||
players_data[player] = stats
|
||
presence_data[player] = presence
|
||
|
||
status_icon = get_presence_icon(presence.get('location', 'Offline'), presence.get('online', False))
|
||
if stats:
|
||
print(f" ✅ {player} {status_icon} {presence.get('location', 'Offline')}")
|
||
else:
|
||
print(f" ⚠️ {player} {status_icon} {presence.get('location', 'Offline')}")
|
||
|
||
current_snapshot = {
|
||
'stats': players_data,
|
||
'presence': presence_data
|
||
}
|
||
|
||
# Check if stats changed
|
||
cached_stats = get_cached_stats()
|
||
has_changed = stats_changed(cached_stats, current_snapshot)
|
||
|
||
if not has_changed:
|
||
print("📍 No changes detected, skipping update")
|
||
sleep_until_next_interval()
|
||
continue
|
||
|
||
# Format message
|
||
message_content = format_stats_message(players_data, presence_data)
|
||
|
||
# Check if we have a previous message ID
|
||
message_id = get_saved_message_id()
|
||
|
||
if message_id:
|
||
# Edit existing message
|
||
print(f"📝 Editing existing message: {message_id}")
|
||
edit_result = edit_message(message_id, message_content)
|
||
if edit_result == "not_found":
|
||
print("📮 Posting replacement message")
|
||
post_message(message_content)
|
||
else:
|
||
# Post new message
|
||
print("📮 No message ID found, posting new message")
|
||
post_message(message_content)
|
||
|
||
# Save stats for next comparison
|
||
save_cached_stats(current_snapshot)
|
||
print("💾 Cached stats saved")
|
||
|
||
# Wait for next update
|
||
sleep_until_next_interval()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n👋 Shutting down stats tracker...")
|
||
break
|
||
except Exception as e:
|
||
print(f"❌ Unexpected error: {e}")
|
||
print(f"💤 Retrying in {REFRESH_INTERVAL}s...")
|
||
time.sleep(REFRESH_INTERVAL)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|