stats-tracker/main.py
ZareMate 317c29879c Add initial implementation of Stats Tracker with environment configuration and player stats tracking
- Create .env.example for environment variable setup
- Add .gitignore to exclude sensitive files and cache
- Implement README.md with setup instructions and usage details
- Develop main.py for fetching and posting player stats to Discord
- Create overlay.py for displaying stats in a GUI overlay
- Add players.json to define players to track
- Specify dependencies in requirements.txt
2026-03-23 19:37:38 +01:00

350 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import json
import os
import time
import requests
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL')
API_KEY = os.getenv('API_KEY')
API_BASE_URL = 'https://api.donutsmp.net/v1/stats'
MESSAGE_ID_FILE = 'message_id.txt'
CACHE_FILE = 'stats_cache.json'
REFRESH_INTERVAL = 60 # 1 minute in seconds
def sleep_until_next_interval():
"""Sleep until the next even 1-minute mark on the clock (e.g. :00, :01, :02...)"""
now = datetime.now()
# Seconds elapsed in the current 1-minute block
elapsed = (now.minute % 1) * 60 + now.second
wait = REFRESH_INTERVAL - elapsed
next_tick = now + timedelta(seconds=wait)
print(f"💤 Sleeping {wait}s until next update at {next_tick.strftime('%H:%M:%S')}...")
time.sleep(wait)
def load_players():
"""Load player list from JSON file"""
with open('players.json', 'r') as f:
data = json.load(f)
return data.get('players', [])
def fetch_player_stats(username):
"""Fetch stats for a single player from the API"""
try:
url = f'{API_BASE_URL}/{username}'
headers = {'Authorization': API_KEY} if API_KEY else {}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get('result', None)
elif response.status_code == 500:
return None # Player doesn't exist
else:
print(f"Error fetching stats for {username}: {response.status_code}")
return None
except Exception as e:
print(f"Exception fetching stats for {username}: {e}")
return None
def format_number(value):
"""Format a number with K/M/B suffixes for readability"""
if value == 'N/A' or value == '':
return value
try:
# Try to convert to int first
num = int(value)
if num >= 1_000_000_000:
return f"{num / 1_000_000_000:.2f} B"
elif num >= 1_000_000:
return f"{num / 1_000_000:.2f} M"
elif num >= 1_000:
return f"{num / 1_000:.2f} K"
else:
return str(num)
except (ValueError, TypeError):
# If it's not a plain number (like playtime with format), return as-is
return str(value)
def format_playtime(milliseconds):
"""Format playtime from milliseconds to readable format (XdYhZm)"""
if milliseconds == 'N/A' or milliseconds == '':
return milliseconds
try:
# Convert milliseconds to seconds
total_seconds = int(milliseconds) // 1000
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
return " ".join(parts) if parts else "0m"
except (ValueError, TypeError):
return str(milliseconds)
def format_stats_message(players_data):
"""Format player stats grouped by stat type into a Discord embed message"""
embed = {
"title": "📊 Server Stats Tracker",
"description": "Real-time player statistics",
"color": 3447003,
"timestamp": datetime.now(timezone.utc).isoformat(),
"fields": []
}
# Define stat groups with icons
stat_groups = {
"💰 Money": "money",
"⏱️ Playtime": "playtime",
"⚔️ Kills": "kills",
"☠️ Deaths": "deaths",
"🧟 Mobs Killed": "mobs_killed",
"🪨 Blocks Placed": "placed_blocks",
"⛏️ Blocks Broken": "broken_blocks",
"✨ Shards": "shards",
}
# Stats that should show a total
summable_stats = {"money"}
# Group stats by type
for group_name, stat_key in stat_groups.items():
field_value = ""
total = 0
can_sum = stat_key in summable_stats
for username, stats in players_data.items():
if stats:
value = stats.get(stat_key, 'N/A')
# Use special formatting for playtime
if stat_key == 'playtime':
formatted_value = format_playtime(value)
else:
formatted_value = format_number(value)
else:
formatted_value = ""
value = 'N/A'
field_value += f"{username}: `{formatted_value}`\n"
# Sum numeric values if this stat is summable
if can_sum and value != 'N/A' and value != '':
try:
total += int(value)
except (ValueError, TypeError):
can_sum = False # Stop summing if we hit a non-numeric value
# Add total if this stat is summable
if can_sum and total > 0:
field_value += f"\n**Total**: `{format_number(total)}`"
embed['fields'].append({
"name": group_name,
"value": field_value.rstrip(),
"inline": True
})
return {"embeds": [embed]}
def get_saved_message_id():
"""Retrieve the saved Discord message ID"""
if os.path.exists(MESSAGE_ID_FILE):
try:
with open(MESSAGE_ID_FILE, 'r') as f:
return f.read().strip()
except:
return None
return None
def get_cached_stats():
"""Retrieve the cached stats from last update"""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r') as f:
return json.load(f)
except:
return None
return None
def save_cached_stats(players_data):
"""Save the current stats to cache file"""
with open(CACHE_FILE, 'w') as f:
json.dump(players_data, f)
def stats_changed(old_data, new_data):
"""Check if stats have changed between updates"""
if old_data is None:
print(" First run - no cached stats")
return True # First run
# Compare all player stats
if set(old_data.keys()) != set(new_data.keys()):
print(" Players list changed")
return True # Different players
for player in new_data:
old_stats = old_data.get(player)
new_stats = new_data.get(player)
# If either is None, check if both changed
if (old_stats is None) != (new_stats is None):
print(f" {player}: stats availability changed")
return True
# If both exist, compare them
if old_stats is not None and new_stats is not None:
if old_stats != new_stats:
print(f" {player}: stats changed")
return True
return False
def save_message_id(message_id):
"""Save the Discord message ID for future edits"""
with open(MESSAGE_ID_FILE, 'w') as f:
f.write(str(message_id))
def post_message(content):
"""Post a new message to Discord webhook"""
try:
# Use wait=true to get the message ID in response
url = f"{WEBHOOK_URL}?wait=true"
response = requests.post(url, json=content, timeout=10)
if response.status_code in [200, 204]:
# Discord returns the message data in JSON
try:
message_data = response.json()
message_id = message_data.get('id')
if message_id:
save_message_id(message_id)
print(f"✅ Posted new message: {message_id}")
else:
print("✅ Posted new message (no ID returned)")
return True
except Exception as e:
print(f"✅ Posted new message (couldn't parse ID: {e})")
return True
else:
print(f"❌ Error posting message: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"❌ Exception posting message: {e}")
return False
def edit_message(message_id, content):
"""Edit an existing Discord message"""
try:
# Extract webhook parts
webhook_parts = WEBHOOK_URL.rstrip('/').split('/')
if len(webhook_parts) < 2:
print("❌ Invalid webhook URL format")
return False
webhook_id = webhook_parts[-2]
webhook_token = webhook_parts[-1]
edit_url = f"https://discordapp.com/api/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}"
response = requests.patch(edit_url, json=content, timeout=10)
if response.status_code == 200:
print(f"✅ Updated message: {message_id}")
return True
else:
print(f"❌ Error editing message: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"❌ Exception editing message: {e}")
return False
def main():
"""Main loop to fetch and update stats"""
if not WEBHOOK_URL:
print("❌ Error: DISCORD_WEBHOOK_URL not set in .env file")
return
print("🚀 Starting stats tracker...")
print(f"📨 Webhook: {WEBHOOK_URL[:50]}...")
print(f"⏱️ Refresh interval: {REFRESH_INTERVAL} seconds")
# Load players
players = load_players()
if not players:
print("❌ No players found in players.json")
return
print(f"👥 Tracking {len(players)} players: {', '.join(players)}")
print("-" * 50)
while True:
try:
# Fetch stats for all players
print(f"\n⏰ [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Fetching stats...")
players_data = {}
for player in players:
stats = fetch_player_stats(player)
players_data[player] = stats
if stats:
print(f"{player}")
else:
print(f" ⚠️ {player}")
# Check if stats changed
cached_stats = get_cached_stats()
has_changed = stats_changed(cached_stats, players_data)
if not has_changed:
print("📍 No changes detected, skipping update")
sleep_until_next_interval()
continue
# Format message
message_content = format_stats_message(players_data)
# Check if we have a previous message ID
message_id = get_saved_message_id()
if message_id:
# Edit existing message
print(f"📝 Editing existing message: {message_id}")
edit_message(message_id, message_content)
else:
# Post new message
print("📮 No message ID found, posting new message")
post_message(message_content)
# Save stats for next comparison
save_cached_stats(players_data)
print("💾 Cached stats saved")
# Wait for next update
sleep_until_next_interval()
except KeyboardInterrupt:
print("\n\n👋 Shutting down stats tracker...")
break
except Exception as e:
print(f"❌ Unexpected error: {e}")
print(f"💤 Retrying in {REFRESH_INTERVAL}s...")
time.sleep(REFRESH_INTERVAL)
if __name__ == '__main__':
main()