344 lines
15 KiB
Python
Executable File
344 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# pylint: disable=W1401,C0303
|
|
# pylint: disable=consider-using-f-string
|
|
|
|
|
|
"""
|
|
+---------------------------------------------------------------------+
|
|
| |
|
|
| ____ _ __ ____ |
|
|
| / __ \ |/ // __ \____ _____ ____ _____ |
|
|
| / / / / // /_/ / __ `/ __ `/ _ \/ ___/ |
|
|
| / /_/ / |/ ____/ /_/ / /_/ / __/ / |
|
|
| /_____/_/|_/_/ \__,_/\__, /\___/_/ |
|
|
| -= DK1MI =- /____/ |
|
|
| |
|
|
| |
|
|
| A DAPNET bot that alerts you when stations pop up on a DX cluster |
|
|
| that are new ones |
|
|
| |
|
|
| Author: Michael Clemens, DK1MI (dxpager@qrz.is) |
|
|
| |
|
|
| Documentation: Please see the README.md file |
|
|
| License: Please see the LICENSE file |
|
|
| Repository: https://codeberg.org/mclemens/dxpager |
|
|
| |
|
|
+---------------------------------------------------------------------+
|
|
"""
|
|
|
|
import sys
|
|
import csv
|
|
import re
|
|
import os
|
|
from os.path import exists
|
|
import configparser
|
|
import zipfile
|
|
from telnetlib import Telnet
|
|
from pathlib import Path
|
|
import json
|
|
import requests
|
|
import hashlib
|
|
from requests.auth import HTTPBasicAuth
|
|
from cachetools import TTLCache
|
|
|
|
|
|
class DXPager():
|
|
"""DXPager class"""
|
|
|
|
def __init__(self):
|
|
"""initialize things"""
|
|
|
|
self.print_banner()
|
|
|
|
self.vip_calls = []
|
|
|
|
self.cache = TTLCache(maxsize=100, ttl=3600)
|
|
self.config = configparser.ConfigParser()
|
|
self.home_dir = str(Path.home())
|
|
self.config_dir = self.home_dir + "/.config/dxpager/"
|
|
# Check if config directory exists and else create it
|
|
Path(self.config_dir).mkdir(parents=True, exist_ok=True)
|
|
self.config_file = os.path.expanduser(self.config_dir + 'dxpager.ini')
|
|
self.read_config(self.config, self.config_file)
|
|
|
|
self.check_files()
|
|
|
|
if self.config['lotw']['user'] != "N0CALL" and self.check_lotw_confirmed:
|
|
self.confirmed_entities = self.get_confirmed_entities()
|
|
|
|
if self.check_cty:
|
|
with open(self.config_dir + self.config['files']['cty'], encoding='us-ascii') as csvfile:
|
|
self.cty = list(csv.reader(csvfile, delimiter=','))
|
|
|
|
if self.check_lotw_activity:
|
|
with open(self.config_dir + self.config['files']['lotw_activity'], encoding='us-ascii') as csvfile:
|
|
self.lotw_activity = list(csv.reader(csvfile, delimiter=','))
|
|
|
|
if self.config['filter'] and self.config['filter']['vip_calls']:
|
|
self.vip_calls = self.config['filter']['vip_calls'].split(',')
|
|
print(self.vip_calls)
|
|
|
|
@staticmethod
|
|
def print_banner():
|
|
"""print an awesome banner"""
|
|
print(" ____ _ __ ____ ")
|
|
print(" / __ \ |/ // __ \____ _____ ____ _____ ")
|
|
print(" / / / / // /_/ / __ `/ __ `/ _ \/ ___/ ")
|
|
print(" / /_/ / |/ ____/ /_/ / /_/ / __/ / ")
|
|
print(" /_____/_/|_/_/ \__,_/\__, /\___/_/ ")
|
|
print(" -= DK1MI =- /____/ ")
|
|
print("")
|
|
|
|
|
|
@staticmethod
|
|
def read_config(config, file_name):
|
|
"""reads the configuration from the config file or
|
|
creates a default config file if none could be found"""
|
|
if os.path.isfile(file_name):
|
|
config.read(file_name)
|
|
else:
|
|
config = configparser.ConfigParser()
|
|
config['cluster'] = {
|
|
'host': 'dxc.nc7j.com',
|
|
'port': '7373',
|
|
'user': 'N0CALL',
|
|
'timeout': '100'}
|
|
config['filter'] = {
|
|
'vip_calls': 'DK1MI,N0CALL'}
|
|
config['dapnet'] = {
|
|
'dapnet_user': 'N0CALL',
|
|
'dapnet_pass': 'xxxxxxxxxxxxxxxxxxxx',
|
|
'dapnet_url': 'http://www.hampager.de:8080/calls',
|
|
'dapnet_callsigns': 'N0CALL',
|
|
'dapnet_txgroup': 'dl-all'}
|
|
config['files'] = {
|
|
'cty': 'cty.csv',
|
|
'cty_url': 'https://www.country-files.com/bigcty/download/bigcty.zip',
|
|
'lotw_confirmed': 'lotw.adi',
|
|
'lotw_activity': 'lotw-user-activity.csv',
|
|
'lotw_activity_url': 'https://lotw.arrl.org/lotw-user-activity.csv'}
|
|
config['lotw'] = {
|
|
'user': 'N0CALL',
|
|
'password': 'CHANGEME',
|
|
'mode': 'ssb'}
|
|
|
|
with open(file_name, 'w', encoding='us-ascii') as configfile:
|
|
config.write(configfile)
|
|
print("\nNo configuration file found. A new configuration file has been created.")
|
|
print("\nPlease edit the file " + file_name + " and restart the application.\n" )
|
|
sys.exit()
|
|
return config
|
|
|
|
@staticmethod
|
|
def download_file(url, local_filename):
|
|
"""downloads a file via HTTP and saves it to a defined file"""
|
|
with requests.get(url, stream=True) as request:
|
|
request.raise_for_status()
|
|
with open(local_filename, 'wb') as file:
|
|
for chunk in request.iter_content(chunk_size=8192):
|
|
file.write(chunk)
|
|
return local_filename
|
|
|
|
|
|
def check_files(self):
|
|
"""Checks if all necessary files are in the file system.
|
|
Downloads all files and unzips them (if necessary)"""
|
|
|
|
# check for lotw qsl information file
|
|
if self.config['lotw']['user'] != "N0CALL":
|
|
self.check_lotw_confirmed = exists(self.config_dir + self.config['files']['lotw_confirmed'])
|
|
if not self.check_lotw_confirmed:
|
|
print("The file " + self.config_dir + self.config['files']['lotw_confirmed'] + " is missing.")
|
|
user = self.config['lotw']['user']
|
|
password = self.config['lotw']['password']
|
|
mode = self.config['lotw']['mode']
|
|
url = "https://lotw.arrl.org/lotwuser/lotwreport.adi?login={}&password={}"\
|
|
"&qso_query=1&qso_qsl=yes&qso_mode={}&qso_qsldetail=yes&"\
|
|
"qso_qslsince=1970-01-01".format(user, password, mode)
|
|
print("Trying to download " + url)
|
|
self.download_file(url, self.config_dir + self.config['files']['lotw_confirmed'])
|
|
self.check_lotw_confirmed = exists(self.config_dir + self.config['files']['lotw_confirmed'])
|
|
if self.check_lotw_confirmed:
|
|
print("File successfully downloaded")
|
|
else:
|
|
print("something went wrong while downloading " + url)
|
|
else:
|
|
self.check_lotw_confirmed = False
|
|
|
|
# check for cty.csv file
|
|
self.check_cty = exists(self.config_dir + self.config['files']['cty'])
|
|
if not self.check_cty:
|
|
url = self.config['files']['cty_url']
|
|
print("The file " + self.config_dir + self.config['files']['cty'] + " is missing.")
|
|
print("Trying to download " + url)
|
|
zip_name = self.download_file(url, self.config_dir + "bigcty.zip" )
|
|
with zipfile.ZipFile(zip_name, 'r') as zip_ref:
|
|
zip_ref.extract("cty.csv", path=self.config_dir)
|
|
os.remove(zip_name)
|
|
self.check_cty = exists(self.config_dir + self.config['files']['cty'])
|
|
if self.check_cty:
|
|
print("File successfully downloaded and extracted.")
|
|
else:
|
|
print("something went wrong while downloading " + url)
|
|
|
|
# check for lotw user activity file
|
|
self.check_lotw_activity = exists(self.config_dir + self.config['files']['lotw_activity'])
|
|
if not self.check_lotw_activity:
|
|
url = self.config['files']['lotw_activity_url']
|
|
print("The file " + self.config_dir + self.config['files']['lotw_activity'] + " is missing.")
|
|
print("Trying to download " + url)
|
|
self.download_file(url, self.config_dir + self.config['files']['lotw_activity'])
|
|
self.check_lotw_activity = exists(self.config_dir + self.config['files']['lotw_activity'])
|
|
if self.check_lotw_activity:
|
|
print("File successfully downloaded")
|
|
else:
|
|
print("something went wrong while downloading " + url)
|
|
|
|
|
|
def get_confirmed_entities(self):
|
|
"""Reads the file downloaded from LotW with all confirmed QSOs,
|
|
extracts all confirmed DXCCs and puts them into a list"""
|
|
ret = []
|
|
with open(self.config_dir + self.config['files']['lotw_confirmed'], \
|
|
encoding='us-ascii') as file:
|
|
for row in file:
|
|
if re.search("<DXCC:", row):
|
|
dxcc = row.partition(">")[2].lower().rstrip()
|
|
if dxcc not in ret:
|
|
ret.append(dxcc)
|
|
return ret
|
|
|
|
|
|
def check_lotw(self, call):
|
|
"""Reads the LotW user activity file and returns the date
|
|
of the last upload date if a specific call sign"""
|
|
ret = ""
|
|
for row in self.lotw_activity:
|
|
if call == row[0]:
|
|
ret = row[1]
|
|
return ret
|
|
return ret
|
|
|
|
|
|
def get_cty_row(self, call):
|
|
"""Parses all CTY records, tries to find the DXCC entity of a
|
|
specific call sign and returns the line as a list of strings"""
|
|
done = False
|
|
while not done:
|
|
for row in self.cty:
|
|
entities = row[9].replace(";", "").replace("=", "").split(" ")
|
|
# TODO: Check if it is a speciall call (=) and mark it in the list
|
|
for prefix in entities:
|
|
if call == prefix:
|
|
return row
|
|
call = call[:-1]
|
|
if call == "":
|
|
return ["-", "-", "-", "-", "-", "-", "-"]
|
|
return None
|
|
|
|
|
|
def get_spots(self):
|
|
"""Connects to the specified telnet dx cluster, performs a login, grabs the
|
|
output row by row, enriches it with data"""
|
|
with Telnet(self.config['cluster']['host'], int(self.config['cluster']['port']), \
|
|
int(self.config['cluster']['timeout'])) as telnet:
|
|
while True:
|
|
line_enc = telnet.read_until(b"\n") # Read one line
|
|
try:
|
|
#line = line_enc.decode('ascii')
|
|
line = line_enc.decode('utf-8')
|
|
except:
|
|
print("Error while encoding the following line:")
|
|
print(line_enc)
|
|
# Enters the call sign if requested
|
|
if "enter your call" in line:
|
|
b_user = str.encode(self.config['cluster']['user']+"\n")
|
|
telnet.write(b_user)
|
|
# Detects the beginning of the stream
|
|
elif " Hello " in line:
|
|
print(line)
|
|
# This is true for every line representing a spot
|
|
elif "DX de" in line or "Dx de" in line:
|
|
try:
|
|
# Extract all necessary fields from the line and store them
|
|
# into different variables.
|
|
call_de = re.search('D(X|x) de (.+?): ', line).group(2)
|
|
freq = re.search(': +(.+?) ', line).group(1)
|
|
call_dx = re.search(freq + ' +(.+?) ', line).group(1)
|
|
#time = re.search('[^ ]*$', line).group(0)[0:4]
|
|
time = re.search(' (\d{4})Z', line).group(1)
|
|
comment = re.search(call_dx + ' +(.+?) +' + time, line).group(1)
|
|
|
|
# If the CTY file is available, further information will be
|
|
# gathered from it, e.g. continent, country, dxcc ID
|
|
if self.check_cty:
|
|
cty_details = self.get_cty_row(call_dx)
|
|
else:
|
|
cty_details = ["-","-","-","-","-","-","-","-","-","-"]
|
|
|
|
areaname = cty_details[1]
|
|
continent = cty_details[3]
|
|
|
|
# If the LotW user activity file is available and the call
|
|
# sign in question is actually a LotW user, we'll add a (L)
|
|
# to the message
|
|
if self.check_lotw_activity and self.check_lotw(call_dx):
|
|
lotw = " (L) "
|
|
else:
|
|
lotw = " "
|
|
|
|
# Removes the trailing .0 from a frequency for better readability
|
|
freq = freq.replace('.0', '')
|
|
|
|
msg = "{}: {} de {} {} {} ({}){}{}"\
|
|
.format(time, call_dx, call_de, freq, areaname, continent, lotw, comment)
|
|
dapnet_json = json.dumps({"text": msg, "callSignNames": \
|
|
[self.config['dapnet']['dapnet_callsigns']], \
|
|
"transmitterGroupNames": [self.config['dapnet']['dapnet_txgroup']], \
|
|
"emergency": False})
|
|
cf = call_dx+freq
|
|
hash_entry = hashlib.md5(cf.encode())
|
|
|
|
# check if the DX call is in your VIP call list.
|
|
# Call sign addition like /P will be ignored
|
|
is_vip_call = False
|
|
my_regex = r"(^.*\/)?" + re.escape(call_dx) + r"(\/.*?)?$"
|
|
for vip in self.vip_calls:
|
|
if re.search(my_regex, vip, re.IGNORECASE):
|
|
is_vip_call = True
|
|
|
|
# If the DX station's entity hasn't been worked/confirmed via
|
|
# LotW yet, the message will be sent to the dapnet API
|
|
if hash_entry.hexdigest() not in self.cache.keys():
|
|
if (self.check_lotw_confirmed and self.config['lotw']['user'] != "N0CALL" \
|
|
and cty_details[2] not in self.confirmed_entities) or \
|
|
is_vip_call:
|
|
response = requests.post(self.config['dapnet']['dapnet_url'], \
|
|
data=dapnet_json, auth=HTTPBasicAuth(\
|
|
self.config['dapnet']['dapnet_user'],\
|
|
self.config['dapnet']['dapnet_pass']))
|
|
self.cache[hash_entry.hexdigest()] = cf
|
|
print("!!! Sent to DAPNET: {} Response: {}".format(msg, response))
|
|
else:
|
|
print(" Entity already confirmed: {}".format(msg))
|
|
else:
|
|
print(" Duplicate: {}".format(msg))
|
|
|
|
except AttributeError:
|
|
print(line)
|
|
|
|
def main():
|
|
"""main routine"""
|
|
try:
|
|
dx_pager = DXPager()
|
|
dx_pager.get_spots()
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
sys.exit(main())
|
|
except EOFError:
|
|
pass
|