114 lines
3.2 KiB
Python
114 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from os.path import exists, join, dirname, abspath
|
|
from json import loads, dumps
|
|
from json.decoder import JSONDecodeError
|
|
import pendulum
|
|
|
|
# Get the full path for this file
|
|
currentdir = dirname(abspath(__file__))
|
|
|
|
# Target log file
|
|
TARGET = join("bbs", join("logs", "enigma-bbs.log"))
|
|
|
|
# Setup logging
|
|
# DEBUG, INFO, WARN, ERROR, CRITICAL
|
|
basicConfig(
|
|
level=DEBUG,
|
|
format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
TimedRotatingFileHandler(
|
|
filename=join(currentdir, "failUser.log"),
|
|
when="midnight",
|
|
backupCount=1,
|
|
),
|
|
#logging.StreamHandler(stream=sys.stdout),
|
|
],
|
|
)
|
|
|
|
log = getLogger("failUser")
|
|
|
|
# Config JSON
|
|
def save_config(con):
|
|
with open("failUser.cfg", "w") as f:
|
|
f.write(dumps(con, indent=4, sort_keys=False))
|
|
|
|
def load_config():
|
|
if not exists("failUser.cfg"):
|
|
now = pendulum.now().to_datetime_string()
|
|
defaults = {
|
|
# Target enigma logs
|
|
"target": "bbs/logs/enigma-bbs.log",
|
|
# block_time in hours
|
|
"block_time": 4,
|
|
# Last unblock
|
|
"last_unblock": now,
|
|
# List of bad users to detect and block
|
|
"bad_users": [
|
|
"root",
|
|
"postgres",
|
|
"mysql",
|
|
"apache",
|
|
"nginx",
|
|
],
|
|
}
|
|
save_config(defaults)
|
|
return defaults
|
|
else:
|
|
with open("failUser.cfg", "r") as f:
|
|
config = loads(f.read())
|
|
return config
|
|
|
|
# blocks in json
|
|
def add_block(ip, time):
|
|
# first load in all blocks
|
|
try:
|
|
with open("blocks.json", "r") as f:
|
|
blocks = loads(f.read())
|
|
except FileNotFoundError:
|
|
blocks = {}
|
|
pass
|
|
# add ip and time
|
|
#log.debug("Added {0} in blocks.json".format(ip))
|
|
blocks[ip] = time
|
|
# update blocks
|
|
with open("blocks.json", "w") as f:
|
|
f.write(dumps(blocks))
|
|
|
|
def rm_block(ip):
|
|
# first load all blocks
|
|
try:
|
|
with open("blocks.json", "r") as f:
|
|
blocks = loads(f.read())
|
|
except FileNotFoundError:
|
|
return
|
|
try:
|
|
if blocks[ip]:
|
|
#log.debug("Removed {0} in blocks.json".format(ip))
|
|
del blocks[ip]
|
|
# update blocks
|
|
with open("blocks.json", "w") as f:
|
|
f.write(dumps(blocks))
|
|
except KeyError:
|
|
log.debug("Unable to unblock '{0}'".format(ip))
|
|
|
|
def check_blocks():
|
|
# return a list of ips exceeding block_time in config
|
|
result = []
|
|
conf = load_config()
|
|
# load in blocks
|
|
try:
|
|
with open("blocks.json", "r") as f:
|
|
blocks = loads(f.read())
|
|
except FileNotFoundError:
|
|
return
|
|
now = pendulum.now()
|
|
for ip in blocks:
|
|
dt = pendulum.parse(blocks[ip])
|
|
#log.debug("IP={0} TIME_LEFT={1}".format(ip, abs(now.diff(dt, False).in_hours())))
|
|
if now.diff(dt).in_hours() > conf["block_time"]:
|
|
# Oops, this ip needs to be unblocked
|
|
result.append(ip)
|
|
if result:
|
|
return result |