HR50-Quisk-Widget/hrctl.py

285 lines
7.1 KiB
Python

#!/usr/bin/python3
import sys
import serial
import re
import gevent
import gevent.pywsgi
import gevent.queue
from tinyrpc.server.gevent import RPCServerGreenlets
from tinyrpc.dispatch import RPCDispatcher
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.wsgi import WsgiServerTransport
class SerialHamlib:
def __init__(self, port: str) -> None:
self._ser = serial.Serial(port, timeout=0.5)
def write_cmd(self, cmd) -> None:
try:
self._ser.write(cmd)
except:
print("Error while sending serial command.")
def get_temperature(self) -> int:
try:
self._ser.write(b'HRTP;')
xx = self._ser.read_until(b'\n')
temp = int(xx[4:-4], base=10)
self._ser.reset_input_buffer()
return temp
except ValueError:
return -1
def get_voltage(self) -> float:
self._ser.write(b'HRVT;')
xx = self._ser.read_until(b'\n')
volt = float(xx[4:-4])
self._ser.reset_input_buffer()
return volt
def get_band(self) -> int:
self._ser.write(b'HRBN;')
xx = self._ser.read_until(b'\n')
band = int(xx[4:-3], base=10)
self._ser.reset_input_buffer()
return band
def get_atu_status(self) -> int:
self._ser.write(b'HRAT;')
xx = self._ser.read_until(b'\n')
# print((xx))
atu = int(chr(xx[4]))
self._ser.reset_input_buffer()
return atu
def set_tune_next(self) -> None:
self._ser.write(b'HRTU1;')
def set_band_6(self) -> None:
self._ser.write(b'HRBN0;')
def set_band_10(self) -> None:
self._ser.write(b'HRBN1;')
def set_band_12(self) -> None:
self._ser.write(b'HRBN2;')
def set_band_15(self) -> None:
self._ser.write(b'HRBN3;')
def set_band_17(self) -> None:
self._ser.write(b'HRBN4;')
def set_band_20(self) -> None:
self._ser.write(b'HRBN5;')
def set_band_30(self) -> None:
self._ser.write(b'HRBN6;')
def set_band_40(self) -> None:
self._ser.write(b'HRBN7;')
def set_band_60(self) -> None:
self._ser.write(b'HRBN8;')
def set_band_80(self) -> None:
self._ser.write(b'HRBN9;')
def set_band_160(self) -> None:
self._ser.write(b'HRBN10;')
def set_auto_tx_data(self, enable: int =1) -> None:
self._ser.write(b'HRRP1;')
def get_auto_tx_data(self) -> int:
self._ser.write(b'HRRP;')
xx = self._ser.read_until(b'\n')
# print((xx))
hrrp = int(chr(xx[4]))
self._ser.reset_input_buffer()
return hrrp
def get_tune_next(self) -> int:
self._ser.write(b'HRTU;')
xx = self._ser.read_until(b'\n')
print(xx)
tune_next = int(xx[4:-3], base=10)
self._ser.reset_input_buffer()
return tune_next
def get_tune_status(self) -> str:
self._ser.write(b'HRTS;')
xx = self._ser.read_until(b'\n')
return xx[5:-3]
def get_tx_status(self) -> str:
self._ser.write(b'HRMX;')
xx = self._ser.read_until(b'\n')
# print((xx))
tx = xx.decode()
self._ser.reset_input_buffer()
return tx
def await_tx_data(self) -> str:
found_hrrp = False
while (not found_hrrp):
xx = self._ser.read_until(b'\n')
print(xx)
match = re.search("HRMX",str(xx))
if match:
found_hrrp = True
def band_tostr(band: int) -> str:
if band == 0:
return "6M"
elif band == 1:
return "10M"
elif band == 2:
return "12M"
elif band == 3:
return "15M"
elif band == 4:
return "17M"
elif band == 5:
return "20M"
elif band == 6:
return "30M"
elif band == 7:
return "40M"
elif band == 8:
return "60M"
elif band == 9:
return "80M"
elif band == 10:
return "160M"
else:
return "unknown"
rpc = JSONRPCProtocol()
hardrock = SerialHamlib('/dev/ttyUSB0')
match = None
try:
print("Temp:", hardrock.get_temperature())
print("Volt:", hardrock.get_voltage())
print("Band:", band_tostr(hardrock.get_band()))
print("ATU:", hardrock.get_atu_status())
tx_status = hardrock.get_tx_status()
match = re.search("HRMX P(\d+) A(\d+) S(\d+) T(\d+)",tx_status)
except:
print("Error while querying data from Hardrock 50.")
# print("TX:", tx_status)
if match:
print("Last transmit stats:")
print("PEP:", match.group(1))
print("AVG P:", match.group(2))
print("SWR: 1:", int(match.group(3)) / 10.0)
if (len(sys.argv) > 1):
if (sys.argv[1] == "tune"):
print("tune")
hardrock.set_tune_next()
print("tune next:",hardrock.get_tune_next())
hardrock.set_auto_tx_data()
print("auto tx:",hardrock.get_auto_tx_data())
hardrock.await_tx_data()
print("Tune status:", hardrock.get_tune_status())
dispatcher = RPCDispatcher()
transport = WsgiServerTransport(queue_class=gevent.queue.Queue)
# start wsgi server as a background-greenlet
wsgi_server = gevent.pywsgi.WSGIServer(('0.0.0.0', 8066), transport.handle)
gevent.spawn(wsgi_server.serve_forever)
rpc_server = RPCServerGreenlets(
transport,
JSONRPCProtocol(),
dispatcher
)
@dispatcher.public
def getTemperature():
return hardrock.get_temperature()
@dispatcher.public
def setTune():
hardrock.set_tune_next()
return hardrock.get_tune_next()
@dispatcher.public
def setBand6():
hardrock.set_band_6()
@dispatcher.public
def setBand10():
hardrock.set_band_10()
@dispatcher.public
def setBand12():
hardrock.set_band_12()
@dispatcher.public
def setBand15():
hardrock.set_band_15()
@dispatcher.public
def setBand17():
hardrock.set_band_17()
@dispatcher.public
def setBand20():
hardrock.set_band_20()
@dispatcher.public
def setBand30():
hardrock.set_band_30()
@dispatcher.public
def setBand40():
hardrock.set_band_40()
@dispatcher.public
def setBand60():
hardrock.set_band_60()
@dispatcher.public
def setBand80():
hardrock.set_band_80()
@dispatcher.public
def setBand160():
hardrock.set_band_160()
@dispatcher.public
def getStatus():
result = {}
tx_status = hardrock.get_tx_status()
match = re.search("HRMX P(\d+) A(\d+) S(\d+) T(\d+)",tx_status)
# print("TX:", tx_status)
if match:
result["pep"] = int(match.group(1))
result["avg"] = int(match.group(2))
result["swr"] = int(match.group(3)) / 10.0
result["temp"] = int(match.group(4))
result["band"] = band_tostr(hardrock.get_band())
#print("Last transmit stats:")
# print("PEP:", match.group(1))
# print("AVG P:", match.group(2))
# print("SWR: 1:", int(match.group(3)) / 10.0)
print(result)
return result
# in the main greenlet, run our rpc_server
rpc_server.serve_forever()