Python и работа в thread
Интерактивное консольное приложение на python.
В коде надо задать OID, которые надо считывать.
На компьютере должен быть подключен COM с подключенным GNSS-модулем.
Периодически считываем данные с девайса и пишем одновременно положение компьютера с GNSS - модуля в csv-файл.
Для меня здесь было из нового, что надо делать полезную работу и одновременно отвечать на запросы пользователя( Остановить работу по Ctrl+C).
А вот так ещё можно сделать исполняемый файл, для винды и unix.
wine pip install -r requirements.txt
wine pyinstaller -F -c src/main.py
pip install -r requirements.txt
pip install pyinstaller
pyinstaller -n main_unix -F -c src/main.py
Только надо не забыть, что Glibc должны на сборочной машине быть не новее, чем так где будем запускать.
#!/usr/bin/env python
import argparse
import contextlib
import signal
import sys
from datetime import datetime
from threading import Event, Thread
from time import sleep
import serial
from pysnmp.error import PySnmpError
# from easysnmp import Session
# from easysnmp.exceptions import EasySNMPError
from pysnmp.hlapi import *
from pyubx2 import UBXReader
last_gps_value = ["NOT_GPS_LAT", "NOT_GPS_LON"]
def get_last_gps_value(com, task_done):
global last_gps_value
if com:
try:
stream = serial.Serial(com, 9600, timeout=3)
except serial.serialutil.SerialException:
return
ubr = UBXReader(stream)
for (_, parsed_data) in ubr.iterate():
if task_done.is_set():
return
if parsed_data.identity == "GNGGA":
last_gps_value = [str(parsed_data.lat), str(parsed_data.lon)]
class SNMPWalkException(Exception):
pass
def walk(host, oid, timeout):
out = []
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
SnmpEngine(),
CommunityData("public"),
UdpTransportTarget((host, 161), timeout=timeout, retries=0),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lookupMib=False,
lexicographicMode=False,
):
if errorIndication:
print(errorIndication, file=sys.stderr)
raise SNMPWalkException
if errorStatus:
print(
"%s at %s"
% (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or "?",
),
file=sys.stderr,
)
raise SNMPWalkException
for varBind in varBinds:
out.append((varBind[0].prettyPrint(), varBind[1].prettyPrint()))
return out
@contextlib.contextmanager
def smart_open(filename=None):
if filename and filename != "-":
fh = open(filename, "w")
else:
fh = sys.stdout
try:
yield fh
finally:
if fh is not sys.stdout:
fh.close()
def get_elem_from_walk(neighbor_stat, num):
return [x for x in neighbor_stat if x[0].startswith(f"{OID_ALL_ELEMENTS}.{num}.")]
def name_stat_file(ip):
return "{0}-{1}.csv".format(datetime.now().strftime("%Y_%m_%d_%H_%M"), ip)
OID_ALL_ELEMENTS = "1.3.6.1.4.1.3942.1.1.1.2.1"
OID_ELEM_HEX = {"SomeValueHex": 2}
OID_ELEM_STR = {"SomeStrValue": 3}
OID_ELEM_INT = {"SomeIntValue": 4}
GPS_VALUES = ("LATITUDE", "LONGTITUDE")
def monitor(ip, task_done, timeout):
global last_gps_value
name_file_csv = name_stat_file(ip)
print(f"File for stat: {name_file_csv}")
# session = Session(hostname=ip, community="public", version=2)
try:
walk(ip, ".1.3.6.1.4.1.3942.1.1.5.1.1.1.1.4", timeout)
except SNMPWalkException:
print(
f"Device {ip} not UP or snmp not worked.\n" "Please run 'snmpd start'",
file=sys.stderr,
)
with open(name_file_csv, "w", encoding="utf-8") as f:
head = (
["DATETIME"]
+ list(GPS_VALUES)
+ list(OID_ELEM_STR.keys())
+ list(OID_ELEM_INT.keys())
)
print(",".join(head), file=f)
while True:
f.flush()
if task_done.is_set():
return
sleep(timeout)
date = str(datetime.now())
try:
neighbours_raw_stat = walk(ip, OID_ALL_ELEMENTS, timeout)
except SNMPWalkException:
print(f"{date} {ip}| walk failed, continue")
print(
",".join(
[date]
+ last_gps_value
+ ["DEVICE_NOT_UP"]
+ ["-"] * (len(OID_ELEM_INT))
),
file=f,
)
continue
print(f"{date} {ip}| got walk")
neighbours_stat = []
for elem in get_elem_from_walk(neighbours_raw_stat, "1"):
neighbours_stat.append([])
if not neighbours_stat:
neighbours_stat.append([])
print(
",".join(
[date]
+ last_gps_value
+ ["NO_VALUES"]
+ ["-"] * (len(OID_ELEM_INT))
),
file=f,
)
continue
for line in neighbours_stat:
print(
",".join([date] + last_gps_value + line),
file=f,
)
f.flush()
def main():
parser = argparse.ArgumentParser(description="probe")
parser.add_argument(
"--timeout", type=int, help="Timeout for requests in milliseconds", default=500
)
parser.add_argument(
"--com", type=str, help="Com-port for GNSS-module", default="COM3"
)
parser.add_argument(
"ip_address", metavar="IP", type=str, nargs="*", help="IP-address devices"
)
args = parser.parse_args()
ip_dev = args.ip_address
timeout = float(args.timeout) / 1000
if not ip_dev:
print("Need set ip_address for monitor values", file=sys.stderr)
parser.print_help()
sys.exit(2)
print(ip_dev)
task_done = Event()
threads = []
gps_thread = Thread(target=get_last_gps_value, args=(args.com, task_done))
threads.append(gps_thread)
gps_thread.start()
for ip in ip_dev:
t = Thread(target=monitor, args=(ip, task_done, timeout))
threads.append(t)
t.start()
def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
print("Save data")
task_done.set()
signal.signal(signal.SIGINT, signal_handler)
while not task_done.is_set():
sleep(1)
for thr in threads:
thr.join()
return True
if __name__ == "__main__":
sys.exit(main())
#python #threading #signal #ctrl_c