====== Python Programming ====== ===== Python Virtual Environments ===== ==== Create virtual environment inside the project folder ==== python -m venv .venv ==== Activate It ==== .\.venv\Scripts\activate ==== Install Packages ==== pip install ==== Create Requirements.txt ==== pip freeze > requirements.txt ==== Install Packages All at Once ==== pip install -r requirements.txt ==== Deactivate ==== deactivate ===== Distribute Standalone EXE using Pyinstaller ===== **Note: It is highly recommended to download the Python program from python.org directly. Python installed from Microsoft Store may have issues (speaking from my personal experience when wxPython was used)** - Install pyinstaller package ''pip install -U pyinstaller'' - Execute ''pyinstaller --noconsole --onefile --windowed your_application.py'' ===== Regular Expression ===== https://developers.google.com/edu/python/regular-expressions ===== Threading ===== https://christopherdavis.me/blog/threading-basics.html ===== Pandas Dataframe Multiindexing ===== https://towardsdatascience.com/accessing-data-in-a-multiindex-dataframe-in-pandas-569e8767201d ===== My NTAG, I2C, Wifi, Subprocess, Timeout, Argparsing Example ===== **tested on Raspbian Jessie** #!/usr/bin/env python3 import smbus import math import re import subprocess import shlex import os import time import signal import functools import argparse # common i2c functions are: # read_byte(addr) # write_byte(addr, val) # read_byte_data(addr, cmd) # write_byte_data(addr, cmd, val) # read_word_data(addr, cmd) # write_word_data(addr, cmd, val) # read_i2c_block_data(addr, cmd, count) # write_i2c_block_data(addr, cmd, vals) class TimedOut(Exception): pass def call_with_timeout(timeout, f, *args, **kwargs): """Call f with the given arguments, but if timeout seconds pass before f returns, raise TimedOut. The exception is raised asynchronously, so data structures being updated by f may be in an inconsistent state. """ def handler(signum, frame): raise TimedOut("Timed out after {} seconds.".format(timeout)) old = signal.signal(signal.SIGALRM, handler) try: signal.alarm(timeout) try: return f(*args, **kwargs) finally: signal.alarm(0) finally: signal.signal(signal.SIGALRM, old) def with_timeout(timeout): """Decorator for a function that causes it to timeout after the given number of seconds. """ def decorator(f): @functools.wraps(f) def wrapped(*args, **kwargs): return call_with_timeout(timeout, f, *args, **kwargs) return wrapped return decorator # def run_program(rcmd): # Runs a program, and it's parameters (e.g. rcmd = 'ls -lh /var/www') # Returns output if sucessful, or None and logs error if not. @with_timeout(50) def run_program(rcmd, wpacli=False): cmd = shlex.split(rcmd) try: proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if (wpacli): for line in iter(proc.stdout.readline, ''): line = line.decode('utf-8') if (re.search(r'Trying to associate with', line)) : print(line, end='') scanned = re.search(r"Trying to associate with\s*(..:..:..:..:..:..).*?SSID='(.*?)'", line) if (scanned): print('SSID = "{}", bssid = "{}"'.format(scanned.group(2), scanned.group(1))) elif (re.search(r'CTRL-EVENT-CONNECTED - Connection to', line)): # connected print(line, end='') break elif (re.search(r'CTRL-EVENT-REGDOM-CHANGE', line)): print(line, end='') # break # not connected but could be another ssid to try elif (re.search(r'CTRL-EVENT', line) or re.search(r'WPA: 4-Way Handshake failed', line) or re.search(r'WPS-AP-AVAILABLE', line) or re.search(r'Associated with', line) or re.search(r'WPA: Key negotiation completed with', line)): print(line, end='') elif (re.search(r'<\d>', line)): print(line, end='') resp = proc.communicate(input=b'quit\n') else: resp = proc.communicate() except TimedOut: print('timed out') proc.terminate() resp = proc.communicate() finally: return resp # Function to put credentials to /etc/wpa_supplicant/wpa_supplicant.conf # Returns True if updated or added, False if nothing to do def addwpa(ssid, psk): if (not ssid or not psk): return try: f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'r') wpafile = f.read() pattern = r'(network\s*=\s*{.*?ssid\s*=\s*"' + ssid + r'".*?psk\s*=\s*")(.*?)(".*?})' # print('Search pattern: {}'.format(pattern)) network = re.search(pattern, wpafile, re.DOTALL) if (network): # update exiting network oldpsk = network.group(2) if (psk != oldpsk): # only do work if needed substr = r'\g<1>' + psk + r'\g<3>' newfile = re.subn(pattern, substr, wpafile, flags=re.DOTALL) # flags are important f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'w') f.write(newfile[0]) f.close() print('Existing Wifi network "{}" has been updated.'.format(ssid)) return True else: print('Existing Wifi network "{}" was up to date.'.format(ssid)) return False else: # add new network newnet = 'network={\n\tssid="' + ssid + '"\n\tmode=0\n\tpsk="' + psk + '"\n}\n' f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'a') f.write(newnet) f.close() print('New Wifi network "{}" has been added.'.format(ssid)) return True except (OSError, IOError) as err: print(err) def read_ntag(): bus = smbus.SMBus(1) addr = 0x55 block1 = bus.read_i2c_block_data(addr, 0x01, 16) # read the first user block and check validity try: if (block1[1] == 0xFF and block[2] != 0xD1): raise ValueError('Message has more than 248 characters are not supported') if (block1[2] != 0xD1): raise ValueError('It is not a well-known type that I can understand.') if (block1[5] != 0x54): raise ValueError('It is not a text NDEF record that I can interpret.') recordlen = block1[4] payload = block1[9:16] i = 0x2 while i <= 0x2 + math.floor((recordlen-9)/16): # read entire payload record block = bus.read_i2c_block_data(addr, i, 16) payload = payload + block i = i+1 # print(' '.join(format(x, '02X') for x in payload)) del payload[recordlen-2:] # trim the extras if (payload[recordlen-3] != 0xFE): raise ValueError('Expecting EOF marking 0xFE. Record length incorrect.') del payload[recordlen-3:] payload = ''.join(chr(x) for x in payload) value = re.findall(r'(["\'])(.*?)\1', payload) if (len(value)<2): raise ValueError('Please put two values in " " or \' \'.') return value[0][1], value[1][1] # return ssid, psk except ValueError as err: print(err) finally: bus.close() # main function def main(): parser = argparse.ArgumentParser() parser.add_argument('-f', '--force', help='force wifi reconnect', action='store_true') args = parser.parse_args() username, password = read_ntag() if (args.force or addwpa(username, password)): # re-start wpa_supplicant print('Connecting Wifi ...') if (not os.path.isfile('/run/wpa_supplicant.wlan0.pid')): print('wpa_supplicant not running') resp = run_program('wpa_cli status') for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='') # run_program('/sbin/wpa_supplicant -s -B -P /run/wpa_supplicant.wlan0.pid -i wlan0 -D nl80211,wext -c /etc/wpa_supplicant/wpa_supplicant.conf') else: print('ifdown wlan0') resp = run_program('ifdown wlan0') for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='') time.sleep(3) print('ifup wlan0') resp = run_program('ifup wlan0') for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='') resp = run_program('wpa_cli', True) # run wpa_cli in interactive mode to get wpa_supplicant output for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='') resp = run_program('wpa_cli status') for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='') print('Getting IP address...') time.sleep(10) ipaddr = subprocess.check_output(['hostname', '-I']) print('IP Address: {}'.format(ipaddr)) else: print('Nothing to do') print('Type "sudo startx" to enter graphic mode.') if __name__ == '__main__': main() ===== Subprocess and Wifi ===== #!/usr/bin/env python # -*- coding: utf-8 -*- # glenn@sensepost.com / @glennzw # Handle wireless networking from Python # The name (evil.py) is a play on 'wicd' from subprocess import Popen, call, PIPE import errno from types import * import logging import sys import logging import time import argparse import re import shlex SUPPLICANT_LOG_FILE = "wpa_supplicant.log" """ This bit of code allows you to control wireless networking via Python. I chose to encapsualte wpa_supplicant because it has the most consistent output with greatest functionality. Currently supports OPEN, WPA[2], and WEP. #e.g: >>> iface = get_wnics()[0] >>> start_wpa(iface) >>> networks = get_networks(iface) >>> connect_to_network(iface, "myHomeNetwork", "WPA", "singehackshackscomounaniƱa") >>> is_associated(iface) True >>> do_dhcp(iface) >>> has_ip(iface) True """ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(filename)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='evil.log', filemode='w') def run_program(rcmd): """ Runs a program, and it's paramters (e.g. rcmd="ls -lh /var/www") Returns output if successful, or None and logs error if not. """ cmd = shlex.split(rcmd) executable = cmd[0] executable_options=cmd[1:] try: proc = Popen(([executable] + executable_options), stdout=PIPE, stderr=PIPE) response = proc.communicate() response_stdout, response_stderr = response[0], response[1] except OSError, e: if e.errno == errno.ENOENT: logging.debug( "Unable to locate '%s' program. Is it in your path?" % executable ) else: logging.error( "O/S error occured when trying to run '%s': \"%s\"" % (executable, str(e)) ) except ValueError, e: logging.debug( "Value error occured. Check your parameters." ) else: if proc.wait() != 0: logging.debug( "Executable '%s' returned with the error: \"%s\"" %(executable,response_stderr) ) return response else: logging.debug( "Executable '%s' returned successfully. First line of response was \"%s\"" %(executable, response_stdout.split('\n')[0] )) return response_stdout def start_wpa(_iface): """ Terminates any running wpa_supplicant process, and then starts a new one. """ run_program("wpa_cli terminate") time.sleep(1) run_program("wpa_supplicant -B -Dwext -i %s -C /var/run/wpa_supplicant -f %s" %(_iface, SUPPLICANT_LOG_FILE)) def get_wnics(): """ Kludgey way to get wireless NICs, not sure if cross platform. """ r = run_program("iwconfig") ifaces=[] for line in r.split("\n"): if "IEEE" in line: ifaces.append( line.split()[0] ) return ifaces def get_networks(iface, retry=10): """ Grab a list of wireless networks within range, and return a list of dicts describing them. """ while retry > 0: if "OK" in run_program("wpa_cli -i %s scan" % iface): networks=[] r = run_program("wpa_cli -i %s scan_result" % iface).strip() if "bssid" in r and len ( r.split("\n") ) >1 : for line in r.split("\n")[1:]: b, fr, s, f = line.split()[:4] ss = " ".join(line.split()[4:]) #Hmm, dirty networks.append( {"bssid":b, "freq":fr, "sig":s, "ssid":ss, "flag":f} ) return networks retry-=1 logging.debug("Couldn't retrieve networks, retrying") time.sleep(0.5) logging.error("Failed to list networks") def _disconnect_all(_iface): """ Disconnect all wireless networks. """ lines = run_program("wpa_cli -i %s list_networks" % _iface).split("\n") if lines: for line in lines[1:-1]: run_program("wpa_cli -i %s remove_network %s" % (_iface, line.split()[0])) def connect_to_network(_iface, _ssid, _type, _pass=None): """ Associate to a wireless network. Support _type options: *WPA[2], WEP, OPEN """ _disconnect_all(_iface) time.sleep(1) if run_program("wpa_cli -i %s add_network" % _iface) == "0\n": if run_program('wpa_cli -i %s set_network 0 ssid \'"%s"\'' % (_iface,_ssid)) == "OK\n": if _type == "OPEN": run_program("wpa_cli -i %s set_network 0 auth_alg OPEN" % _iface) run_program("wpa_cli -i %s set_network 0 key_mgmt NONE" % _iface) elif _type == "WPA" or _type == "WPA2": run_program('wpa_cli -i %s set_network 0 psk "%s"' % (_iface,_pass)) elif _type == "WEP": run_program("wpa_cli -i %s set_network 0 wep_key %s" % (_iface,_pass)) else: logging.error("Unsupported type") run_program("wpa_cli -i %s select_network 0" % _iface) def is_associated(_iface): """ Check if we're associated to a network. """ if "wpa_state=COMPLETED" in run_program("wpa_cli -i %s status" % _iface): return True return False def has_ip(_iface): """ Check if we have an IP address assigned """ status = run_program("wpa_cli -i %s status" % _iface) r = re.search("ip_address=(.*)", status) if r: return r.group(1) return False def do_dhcp(_iface): """ Request a DHCP lease. """ run_program("dhclient %s" % _iface) def main(): print "[--- EViL. Python wireless network manager. ---]" print "[ glenn@sensepost.com / @glennzw\n" parser = argparse.ArgumentParser() parser.add_argument("-n","--nics", help="List wireless network interfaces.", action="store_true") parser.add_argument("-l","--list", help="List wireless networks (specify adapter).", action="store_true") parser.add_argument("-i","--iface", help="Specify interface.") parser.add_argument("-c","--connect", help="Connect to network.", action="store_true") parser.add_argument("-s","--ssid", help="Specify SSID") parser.add_argument("-t","--type", help="Specify network type (OPEN, WEP, WPA, WPA2)") parser.add_argument("-p","--passw", help="Specify password or key.") args = parser.parse_args() if len(sys.argv) < 2: print "[!] No options supplied. Try --help." sys.exit(-1) if args.nics: nics = get_wnics() if nics: print "[+] Available NICs:" for nic in get_wnics(): print nic else: print "[W] No wireless interfaces found :-(" elif args.list: if not args.iface: print "[!] Please specify interface. Use --help for help." sys.exit(-1) else: if args.iface not in get_wnics(): print "[E] Bad interface! - '%s'" % args.iface sys.exit(-1) print "[+] Searching for available networks..." start_wpa(args.iface) networks = get_networks(args.iface) if networks: networks = sorted(networks, key=lambda k: k['sig']) print "[+] Networks in range:" for network in networks: print " SSID:\t%s" % network['ssid'] print " Sig:\t%s" % network['sig'] print " BSSID:\t%s" % network['bssid'] print " Flags:\t%s" % network['flag'] print " Freq:\t%s\n" % network['freq'] else: print "[W] No wireless networks detected :-(" elif args.connect: if not args.iface or not args.ssid or not args.type or (args.type != "OPEN" and not args.passw): print "[E] Missing options for --connect. Check --help for assistance." sys.exit(-1) else: sys.stdout.write( "[+] Associating to '%s' on '%s' (may take some time)... " % (args.ssid, args.iface)) sys.stdout.flush() if args.iface not in get_wnics(): print "[E] No such wireless interface! - '%s'" % args.iface sys.exit(-1) start_wpa(args.iface) connect_to_network(args.iface, args.ssid, args.type, args.passw) while not is_associated(args.iface): time.sleep(1) print "Success." sys.stdout.write("[+] Requesting DHCP lease... ") sys.stdout.flush() do_dhcp(args.iface) while not has_ip(args.iface): time.sleep(1) print "Success. (%s)" % has_ip(args.iface) print "[+] Associated and got lease. Hoorah." if __name__ == "__main__": main() ===== Resources ===== ==== Plotly & Dash ==== - https://python-graph-gallery.com/plotly/ - https://stackoverflow.com/questions/40471026/subplots-with-two-y-axes-each-plotly-and-python-pandas - https://medium.com/@yahyasghiouri1998/dockerize-your-dash-app-f502275475fa - https://devopscube.com/build-docker-image/ - https://stackoverflow.com/questions/76004602/how-do-we-access-the-value-of-dcc-store-in-plotly - https://stackoverflow.com/questions/63225707/plotly-dash-refreshing-global-data-on-reload