minetest-trackr/trackr.py

757 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
#
# trackr 2.3.0
#
# © 2020-2022 by luk3yx.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see https://www.gnu.org/licenses/.
#
# Usage:
# • Make sure you are using the latest miniirc and miniirc_extras.
# · sudo pip3 install --upgrade miniirc miniirc_extras
# • Create a trackr.ini file similar to the below one.
# • Run the script.
#
# See trackr.example.ini for an example configuration
#
from __future__ import annotations
import hashlib, math, miniirc, miniirc_extras, os, random, sys, time
assert miniirc.ver >= (1,4,3), 'Update miniirc.'
assert miniirc_extras.ver >= (0,2,5), 'Update miniirc_extras.'
from miniirc_extras import AbstractIRC, Hostmask
from miniirc_extras.features.chans import Channel, ModeList, ChannelTracker
from miniirc_extras.features.users import AbstractChannel, User, UserTracker
from typing import Optional, Union
__version__ = '2.3.0'
# Errors
class BotError(Exception):
pass
def err(msg: str, *args, **kwargs) -> None:
if args or kwargs:
msg = msg.format(*args, **kwargs)
raise BotError(msg)
# Get a plural
def plural(n: int) -> str:
return '' if n == 1 else 's'
# Get a valid Lua representation of a string
def _escape_string(x: bytes):
yield '"'
for char in x:
if char == 0x22: # "
yield r'\"'
elif char == 0x5c:
yield r'\\'
elif 0x7f > char > 0x1f:
yield chr(char)
else:
yield '\\' + str(char).zfill(3)
yield '"'
def lua_repr(s: str) -> str:
return ''.join(_escape_string(s.encode('utf-8')))
# A player action error
class ModerationError(Exception):
pass
_durations = {'ms': 0.001, 's': 1, 'm': 60, 'h': 3600, 'd': 86400, 'D': 86400,
'W': 604800, 'M': 2592000, 'Y': 31104000}
def _parse_duration(duration: Union[str, int, float]) -> int:
"""
Returns the duration in seconds. If the specified duration is an int or
float, math.floor(duration) is returned.
"""
if isinstance(duration, str):
for suffix, m in _durations.items():
if duration.endswith(suffix):
duration = duration[:-len(suffix)]
multiplier = m
break
else:
multiplier = 60
try:
duration = float(duration)
except ValueError:
raise ModerationError('Invalid duration!')
else:
multiplier = 1
duration = math.floor(duration * multiplier)
if duration <= 0:
raise ModerationError('The duration must be at least one second!')
return duration
# A player class
class Player(str):
total_warnings: int = 2
warnings: int
__slots__ = ('warnings', '_server')
# Kick the player
def kick(self, sender: str, reason: str) -> None:
assert self._server
self._server.msg(f'cmd kick {self} By {sender}: {reason}')
# Mute the player
def mute(self) -> None:
assert self._server
self._server.msg(f'cmd revoke {self} shout')
# Unmute the player
def unmute(self) -> None:
assert self._server
self._server.msg(f'cmd grant {self} shout')
# Tempmute the player with //lua hacks
def tempmute(self, duration: Union[str, int, float]) -> None:
assert self._server
duration = _parse_duration(duration)
if duration > 7200:
raise ModerationError('You cannot tempmute someone for over 2 '
'hours!')
# Create a hacky lua script
script = (f'cmd /lua local m={lua_repr(self)};'
'core.registered_chatcommands.revoke.func("trackr",m.." shout")'
'local function r() '
'if m then '
'core.registered_chatcommands.grant.func("trackr",'
'm.." shout") '
'end '
'end '
f'core.after({duration},r);'
'core.register_on_shutdown(r)')
self._server.msg(script)
# Warn the player
def warn(self, sender: str, msg: str) -> str:
assert self._server
if self.warnings > 0:
msg2 = '{} warning{} left until you get temp-muted.'.format(
self.warnings, plural(self.warnings))
self.warnings -= 1
else:
self.tempmute(30 * 60)
msg2 = 'been temporarily muted for 30 minutes.'
self.warnings = self.total_warnings
msg = f'{msg}\n -- {sender}\n\nYou have {msg2}'
self._server.msg(f'cmd /lua core.show_formspec({lua_repr(self)},'
'"trackr:warning", "size[8,5;]image[0,0;1,1;bucket_lava.png]'
'image[7,0;1,1;bucket_lava.png]'
'label[1.25,0.25;WARNING - Please read carefully.]'
f'label[0,1.25;" .. minetest.formspec_escape({lua_repr(msg)}) .. "]'
'button_exit[0,4.5;8,0.5;quit;Continue]'
'" .. (default.gui_bg or ""))')
return self + ' has ' + msg2.replace('you', 'they')
# Temporarily ban a player
def tempban(self, sender: str, duration: Union[str, int, float],
reason: str) -> None:
assert self._server
duration = _parse_duration(duration)
if duration > _durations['M']:
raise ModerationError('You cannot tempban someone for longer than '
'1 month!')
self._server.msg(
f'cmd xtempban {self} {duration}s Banned by {sender}@IRC: {reason}'
)
def __repr__(self) -> str:
return f'{type(self).__name__}({super().__repr__()}, {self.warnings})'
def __new__(cls, name: str, warnings: Optional[int] = None, *,
server: Optional[User] = None):
return super().__new__(cls, name) # type: ignore
def __init__(self, name: str, warnings: Optional[int] = None, *,
server: Optional[User] = None) -> None:
assert name
self.warnings: int
if warnings is None:
self.warnings = self.total_warnings
else:
self.warnings = warnings
self._server: Optional[User] = server
# A player list
class PlayerList(dict):
__slots__ = ('server',)
# An easy way to create new players
def Player_(self, name: str, warnings: Optional[int] = None) \
-> Optional[Player]:
if not name:
return None
if name in self:
player = self[name]
else:
player = Player(name, warnings, server = self.server)
self[name] = player
return player
def get(self, key: str, *args):
return super().get(str(key).lower(), *args)
def __getitem__(self, key: str):
return super().__getitem__(str(key).lower())
def __setitem__(self, key: str, value: Player):
return super().__setitem__(str(key).lower(), value)
def __delitem__(self, key: str):
return super().__delitem__(str(key).lower())
def __contains__(self, key) -> bool:
return super().__contains__(str(key).lower())
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.server: Optional[User] = None
Player = Player_
del Player_
# The bot
class Trackr:
cooldown: int = 15
irc: AbstractIRC
# Alias for self.irc.debug
@property
def debug(self):
return self.irc.debug
# Hacks to keep mypy happy
@property
def users(self) -> UserTracker:
return self.irc.users # type: ignore
@property
def chans(self) -> ChannelTracker:
return self.irc.chans # type: ignore
# __init__
def __init__(self, rawconfig: dict[str, dict[str, str]],
debug: bool = False) -> None:
if 'trackr' not in rawconfig:
err('Invalid or non-existent config file!')
config: dict[str, str] = rawconfig['trackr']
del rawconfig
self.config: dict[str, str] = config
self.cooldowns: dict[str, float] = {}
self.cooldown_msgs_sent: set[str] = set()
self._conf_assert('ip', ('ssl_port', int), 'nick', 'channels',
'admins')
self._secret: bytes = config.get('secret', '').encode('utf-8')
self.admins: frozenset[str] = frozenset(map(
lambda n : n.strip().lower(), config['admins'].split(',')))
self.prefix = config.get('prefix', config['nick'] + ': ')
self.server_list: Optional[frozenset[str]] = None
serverlist = config.get('server_list', '').strip()
if serverlist:
self.server_list = frozenset(map(str.strip, serverlist.split(',')))
self.server_mode: str = config.get('server_mode', 'v')
if len(self.server_mode) != 1:
err(f'Invalid server mode {self.server_mode!r}.')
# enable_legacy_passwords should default to true
pwds = config.get('enable_legacy_passwords', '1').casefold().strip()
self.enable_legacy_passwords = pwds in ('true', 'yes', '1')
self.new_domain: Optional[str] = None
self.legacy_domains: Optional[list[str]] = None
if 'new_domain' in config and 'legacy_domains' in config:
self.new_domain = config['new_domain']
self.legacy_domains = config['legacy_domains'].split(',')
kwargs = {}
for i in 'ident', 'realname', 'ns_identity', 'connect_modes', \
'quit_message':
if i in config:
kwargs[i] = config[i]
# Create the IRC object
self.irc = miniirc.IRC(config['ip'], # type: ignore
int(config['ssl_port']), config['nick'],
set(map(str.strip, config['channels'].split(','))),
debug=debug, auto_connect=False, ssl=True, **kwargs) # type: ignore
# Load irc.chans and irc.users
self.irc.require('chans')
self.irc.require('users')
# Add handlers
self.irc.Handler('PRIVMSG', colon=False)(self._handle_privmsg)
self.irc.Handler('JOIN', colon=False)(self._handle_join)
# Connect
self.irc.connect()
# Function copied from lurklite
def _conf_assert(self, *keys: Union[str, tuple[str, type]]) -> None:
for key in keys:
req: Optional[type] = None
if isinstance(key, tuple):
key, req = key
if key not in self.config:
err('Required config value {} missing!', repr(key))
elif req:
try:
req(self.config[key])
except:
err('Config value {} contains an invalid {}.', repr(key),
req.__name__)
# Check if a hostmask is a server
def is_server(self, channel: Union[str, AbstractChannel],
hostmask: Union[Hostmask, User]) -> bool:
if not isinstance(channel, Channel):
if isinstance(channel, AbstractChannel):
channel = channel.name
try:
channel = self.chans[channel]
except:
# Oops, use a backup method
if not isinstance(hostmask, User):
hostmask = self.users[hostmask]
return 'players' in hostmask.keys()
modes = self.server_list or channel.modes.getset(self.server_mode)
if isinstance(hostmask, User):
hostmask = hostmask.hostmask
elif not isinstance(hostmask, Hostmask):
raise TypeError('is_server() expects User or hostmask.')
return hostmask[0] in modes or hostmask[0].lower() in modes
# Check if a user is an admin
def is_admin(self, channel: Union[str, AbstractChannel],
hostmask: Union[Hostmask, User, str]) -> bool:
if not isinstance(channel, Channel):
if isinstance(channel, AbstractChannel):
channel = channel.name
channel = self.chans[channel]
nick: str
if isinstance(hostmask, User):
nick = hostmask.nick
elif isinstance(hostmask, Hostmask):
nick = hostmask[0]
else:
nick = hostmask
lnick: str = nick.lower()
for mode in 'oaq':
users = channel.modes.getset(mode)
if nick in users or lnick in users:
return True
return False
# Derive a password from a hostmask
def get_password(self, hostmask: Hostmask, legacy: bool = False) -> str:
host = '/'.join(hostmask[2].split('/', 3)[:3])
if legacy:
host = '.'.join(host.split('.', 2)[:2])
pw = f'{hostmask[0]}@{host}'.encode('utf-8')
pw += b', secret: ' + self._secret
# Hash it
return hashlib.sha512(pw).hexdigest()
# Get an iterable list with servers
def servers(self, channel: Union[AbstractChannel, str]):
if isinstance(channel, str):
channel = self.chans[channel]
for user in channel.users:
if self.is_server(channel, user):
yield user
# Get a list of both servers and players
def items(self, channel: Union[AbstractChannel, str]):
for server in self.servers(channel):
yield server, server.get('players')
# The players command
def _players_cmd(self, channel: str, nick: str) -> None:
irc: AbstractIRC = self.irc
t = time.monotonic()
if t <= self.cooldowns.get(channel, -math.inf) + self.cooldown:
if channel in self.cooldown_msgs_sent:
return
irc.msg(channel, f'{nick}: You can only run \2.players\2 once',
f'every \2{self.cooldown} seconds\2.')
self.cooldown_msgs_sent.add(channel)
return
self.cooldowns[channel] = t
self.cooldown_msgs_sent.discard(channel)
# Get the player list
total: int = 0
inactive: int = 0
tplayers: int = 0
slist: list[tuple[User, PlayerList]] = list(self.items(channel))
slist.sort(key = lambda s : s[0].nick.lower())
# Iterate over every server in the channel
for server, players in slist:
if not players:
inactive += 1
continue
total += 1
tplayers += len(players)
players2: list[Player] = list(players.values())
players2.sort()
irc.msg(channel, 'Players on \2{}\2: {}'.format(server.nick,
', '.join(players2)))
self.cooldowns[channel] += 0.5
time.sleep(0.5)
# Display the summary
irc.msg(channel, ('Total: \2{} player{}\2 across \2{} active '
'server{}\2 (and {} empty server{}).').format(tplayers,
plural(tplayers), total, plural(total), inactive,
plural(inactive)))
# The login command
def _login_cmd(self, nick: str, param: str) -> None:
irc: AbstractIRC = self.irc
params = param.split(' ', 1)
del param
if len(params) != 2:
irc.msg(nick, 'Invalid syntax! Syntax: login <server> <password>')
return
sid, pw = params
del params
if sid not in self.users:
irc.msg(nick, f"What's a {sid!r}?")
return
server: User = self.users[sid]
if 'players' not in server.keys():
irc.msg(nick, f'{sid!r} is not a server!')
server['logged_in'] = 0
server.msg('login trackr', pw)
irc.msg(nick, 'I will attempt to log in.')
# Handle the moderation commands
def _moderate(self, channel: str, hostmask: Hostmask, cmd: str,
param: str) -> str:
if not self._secret:
return 'Moderation is disabled!'
chan: AbstractChannel = self.chans[channel]
if not isinstance(chan, Channel):
return 'Error: This should never happen.'
# Make sure the user is a channel operator
is_op = False
user: User = self.users[hostmask]
if not self.is_admin(channel, hostmask):
return 'Permission denied!'
# Make sure the player exists
n = param.split(' ', 1)
victim: str = n[0].lower()
server: Optional[User] = None
if '@' in victim:
victim, sid = victim.split('@', 1)
try:
server = self.users[sid]
assert server in chan
except:
return f'The server {sid!r} does not exist!'
if victim not in server.get('players', ()):
return f'The player {victim!r} is not in {server.nick}.'
else:
for s, p in self.items(chan):
if p and victim in p:
if server is not None:
return 'Error: That player is in multiple servers!'
server = s
if not server:
return 'Unknown player!'
elif not server.get('logged_in'):
return f'I am not logged into {server.nick}!'
player: Player = server['players'][victim] # type: ignore
res: Optional[str] = None
try:
if cmd in ('mute', 'unmute'):
res = getattr(player, cmd)()
elif cmd in ('warn', 'kick'):
res = getattr(player, cmd)(hostmask[0], n[-1])
elif cmd == 'tempmute':
player.tempmute(n[-1])
elif cmd == 'tempban':
try:
duration, message = n[-1].split(' ', 1)
except ValueError:
return 'Usage: tempban <player> <duration> <reason>'
player.tempban(hostmask[0], duration, message)
else:
return 'Internal error!'
except ModerationError as e:
return f'Error: {e}'
if not res:
res = f'Attempted to {cmd} {player}.'
return res
# Handle PRIVMSGs
def _handle_privmsg(self, irc: AbstractIRC, hostmask: Hostmask,
args: list[str]) -> None:
nick: str = hostmask[0]
channel: str = args[0]
msg: str = args[-1]
# Check for relayed users
if msg.startswith('<'):
n: list[str] = msg.split('> ', 1)
if len(n) > 1 and '>' not in n[0]:
nick = f'{n[0][1:]}@{nick}'
msg = n[1].strip()
del n
if msg.startswith('.players'):
msg = self.prefix + msg[1:]
# Check for commands
if msg.startswith(self.prefix):
cmd_args = msg[len(self.prefix):].split(' ', 1)
cmd = cmd_args[0].lower()
if irc.current_nick.lower() == args[0].lower():
if cmd != 'login':
irc.msg(hostmask[0],
'You may not execute commands in PMs.')
elif hostmask[-1].split('/')[-1].lower() in self.admins:
self._login_cmd(hostmask[0], cmd_args[-1])
else:
irc.msg(hostmask[0], 'Permission denied!')
return
if cmd == 'players':
return self._players_cmd(channel, nick)
elif cmd in ('kick', 'mute', 'unmute', 'tempmute', 'warn',
'tempban'):
irc.msg(channel, nick + ': ' + self._moderate(channel,
hostmask, cmd, cmd_args[1] if len(cmd_args) > 1 else ''))
return
elif cmd == 'badservers':
if not self._secret:
irc.msg(channel, f'{nick}: As moderation is disabled, no '
f'attempt has been made to log in to servers.')
return
bad = []
for s_ in self.servers(channel):
if not s_.get('logged_in'):
bad.append(s_.nick)
if bad:
bad.sort(key = lambda n : n.lower())
else:
bad.append('(none)')
irc.msg(channel,
f'{nick}: Servers I am not logged into: {", ".join(bad)}')
return
elif cmd == 'die':
if hostmask[-1].split('/')[-1].lower() in self.admins:
msg = f'{nick} ordered me to die- wait, why did I listen?'
irc.disconnect(msg)
print(msg)
os._exit(0)
else:
msg = random.choice(("But I don't want to die.", 'No.',
'Resistance is futile.', 'Sorry, what was that?',
'You know I could ignore you all day.',
"I'm going to pretend you didn't say that.",
'die: Singular form of dice.'))
irc.msg(channel, f'{nick}: {msg}')
return
if nick != hostmask[0] or not self.is_server(channel, hostmask):
return
# Store Player objects inside the User object so they are moved with
# nick changes etc.
server: User = self.users[hostmask]
players: PlayerList
if 'players' in server.keys():
players = server['players'] # type: ignore
assert isinstance(players, PlayerList)
else:
server['players'] = players = PlayerList()
players.server = server
if msg.startswith('*** '):
a: list[str] = msg.split(' ', 3)
if len(a) <= 2:
return
if a[2] == 'joined':
players.Player(a[1])
elif a[2] == 'left' and a[1] in players:
del players[a[1]]
del a
elif msg.startswith('Connected players: '):
new_players: list[str] = args[-1][19:].replace(' ', '').split(',')
for player in new_players:
players.Player(player)
for player, pobj in tuple(players.items()):
if str(pobj) not in new_players:
print('Deleting player', repr(pobj))
del players[player]
# Log in
if server.get('logged_in') is None and self._secret:
self.debug('[trackr] Logging into', hostmask[0])
server.msg('login trackr', self.get_password(hostmask))
elif msg.startswith('You are now logged in as') and self._secret:
self.debug('[trackr] Logged into', hostmask[0])
logged_in = server.get('logged_in')
if logged_in == 0 and logged_in is not False:
server.msg('cmd setpassword trackr',
self.get_password(hostmask))
server.msg('cmd /lua irc.say("[trackr] Logged in!")')
elif server.get('password_attempt'):
# Update the password to the current one if it's using an
# older password
print(f'[trackr] Auto-updating password for {hostmask[0]}')
server.msg('cmd setpassword trackr', self.get_password(hostmask))
del server['password_attempt']
server['logged_in'] = True
elif msg.startswith('Incorrect password'):
# Try using legacy passwords
attempt = server.get('password_attempt', 0)
if pw := self._get_pw_attempt(hostmask, attempt):
print(f'[trackr] Invalid password for {hostmask[0]!r}, trying '
f'with an older password (attempt {attempt + 1})')
server['password_attempt'] = attempt + 1
server.msg('login trackr', pw)
return
server['logged_in'] = False
print('[trackr] WARNING: Incorrect password for server',
hostmask[0], file=sys.stderr)
def _get_pw_attempt(self, hostmask: Hostmask, attempt: int) -> Optional[str]:
if not self.enable_legacy_passwords:
return None
# First try with the legacy password for the current hostmask
if attempt == 0 and hostmask[2].count('.') > 2:
return self.get_password(hostmask, True)
elif not self.new_domain or hostmask[2] != self.new_domain:
return None
# Then go through legacy domains
# This will be horribly slow
idx = (attempt - 1) // 2
if not self.legacy_domains or idx >= len(self.legacy_domains):
return None
return self.get_password(
Hostmask(hostmask[0], hostmask[1], self.legacy_domains[idx]),
attempt % 2 == 0,
)
# Handle JOINs
def _handle_join(self, irc: AbstractIRC, hostmask: Hostmask,
args: list[str]) -> None:
time.sleep(1)
if hostmask[0].lower() == irc.current_nick.lower():
for server in self.servers(args[0]):
server['players'] = players = PlayerList()
players.server = server
server.msg('players', '-',
'If you are a human, report this to the bot owner.')
return
if not self.is_server(args[0], hostmask):
return
user = self.users[hostmask]
players = PlayerList()
players.server = user
user['players'] = players
irc.msg(hostmask[0],
'players - If you are a human, report this to the bot owner.')
# The main script
def main() -> None:
import argparse, configparser
parser = argparse.ArgumentParser()
parser.add_argument('config_file',
help='The config file to use with trackr.')
parser.add_argument('--verbose', '--debug', action='store_true',
help='Enable verbose/debugging mode.')
parser.add_argument('-v', '--version', action='version',
version=f'trackr v{__version__} (powered by {miniirc.version})')
args = parser.parse_args()
# Load the config file
config = configparser.ConfigParser()
config.read(args.config_file)
# Create the bot
try:
trackr = Trackr(config, debug=args.verbose) # type: ignore
except BotError as e:
print(f'ERROR: {e}', file=sys.stderr)
raise SystemExit(1)
trackr.irc.wait_until_disconnected()
# Call main()
if __name__ == '__main__':
main()