====== Python Programming ======
===== Python Virtual Environments =====
==== Create virtual environment inside the project folder ====
python -m venv .venv
==== Activate It ====
.\.venv\Scripts\activate
==== Install Packages ====
pip install
==== Create Requirements.txt ====
pip freeze > requirements.txt
==== Install Packages All at Once ====
pip install -r requirements.txt
==== Deactivate ====
deactivate
===== Distribute Standalone EXE using Pyinstaller =====
**Note: It is highly recommended to download the Python program from python.org directly. Python installed from Microsoft Store may have issues (speaking from my personal experience when wxPython was used)**
- Install pyinstaller package ''pip install -U pyinstaller''
- Execute ''pyinstaller --noconsole --onefile --windowed your_application.py''
===== Regular Expression =====
https://developers.google.com/edu/python/regular-expressions
===== Threading =====
https://christopherdavis.me/blog/threading-basics.html
===== Pandas Dataframe Multiindexing =====
https://towardsdatascience.com/accessing-data-in-a-multiindex-dataframe-in-pandas-569e8767201d
===== My NTAG, I2C, Wifi, Subprocess, Timeout, Argparsing Example =====
**tested on Raspbian Jessie**
#!/usr/bin/env python3
import smbus
import math
import re
import subprocess
import shlex
import os
import time
import signal
import functools
import argparse
# common i2c functions are:
# read_byte(addr)
# write_byte(addr, val)
# read_byte_data(addr, cmd)
# write_byte_data(addr, cmd, val)
# read_word_data(addr, cmd)
# write_word_data(addr, cmd, val)
# read_i2c_block_data(addr, cmd, count)
# write_i2c_block_data(addr, cmd, vals)
class TimedOut(Exception):
pass
def call_with_timeout(timeout, f, *args, **kwargs):
"""Call f with the given arguments, but if timeout seconds pass before
f returns, raise TimedOut. The exception is raised asynchronously,
so data structures being updated by f may be in an inconsistent state.
"""
def handler(signum, frame):
raise TimedOut("Timed out after {} seconds.".format(timeout))
old = signal.signal(signal.SIGALRM, handler)
try:
signal.alarm(timeout)
try:
return f(*args, **kwargs)
finally:
signal.alarm(0)
finally:
signal.signal(signal.SIGALRM, old)
def with_timeout(timeout):
"""Decorator for a function that causes it to timeout after the given
number of seconds.
"""
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
return call_with_timeout(timeout, f, *args, **kwargs)
return wrapped
return decorator
# def run_program(rcmd):
# Runs a program, and it's parameters (e.g. rcmd = 'ls -lh /var/www')
# Returns output if sucessful, or None and logs error if not.
@with_timeout(50)
def run_program(rcmd, wpacli=False):
cmd = shlex.split(rcmd)
try:
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if (wpacli):
for line in iter(proc.stdout.readline, ''):
line = line.decode('utf-8')
if (re.search(r'Trying to associate with', line)) :
print(line, end='')
scanned = re.search(r"Trying to associate with\s*(..:..:..:..:..:..).*?SSID='(.*?)'", line)
if (scanned): print('SSID = "{}", bssid = "{}"'.format(scanned.group(2), scanned.group(1)))
elif (re.search(r'CTRL-EVENT-CONNECTED - Connection to', line)): # connected
print(line, end='')
break
elif (re.search(r'CTRL-EVENT-REGDOM-CHANGE', line)):
print(line, end='')
# break # not connected but could be another ssid to try
elif (re.search(r'CTRL-EVENT', line) or re.search(r'WPA: 4-Way Handshake failed', line) or
re.search(r'WPS-AP-AVAILABLE', line) or re.search(r'Associated with', line) or
re.search(r'WPA: Key negotiation completed with', line)): print(line, end='')
elif (re.search(r'<\d>', line)): print(line, end='')
resp = proc.communicate(input=b'quit\n')
else:
resp = proc.communicate()
except TimedOut:
print('timed out')
proc.terminate()
resp = proc.communicate()
finally:
return resp
# Function to put credentials to /etc/wpa_supplicant/wpa_supplicant.conf
# Returns True if updated or added, False if nothing to do
def addwpa(ssid, psk):
if (not ssid or not psk): return
try:
f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'r')
wpafile = f.read()
pattern = r'(network\s*=\s*{.*?ssid\s*=\s*"' + ssid + r'".*?psk\s*=\s*")(.*?)(".*?})'
# print('Search pattern: {}'.format(pattern))
network = re.search(pattern, wpafile, re.DOTALL)
if (network): # update exiting network
oldpsk = network.group(2)
if (psk != oldpsk): # only do work if needed
substr = r'\g<1>' + psk + r'\g<3>'
newfile = re.subn(pattern, substr, wpafile, flags=re.DOTALL) # flags are important
f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'w')
f.write(newfile[0])
f.close()
print('Existing Wifi network "{}" has been updated.'.format(ssid))
return True
else:
print('Existing Wifi network "{}" was up to date.'.format(ssid))
return False
else: # add new network
newnet = 'network={\n\tssid="' + ssid + '"\n\tmode=0\n\tpsk="' + psk + '"\n}\n'
f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'a')
f.write(newnet)
f.close()
print('New Wifi network "{}" has been added.'.format(ssid))
return True
except (OSError, IOError) as err:
print(err)
def read_ntag():
bus = smbus.SMBus(1)
addr = 0x55
block1 = bus.read_i2c_block_data(addr, 0x01, 16) # read the first user block and check validity
try:
if (block1[1] == 0xFF and block[2] != 0xD1):
raise ValueError('Message has more than 248 characters are not supported')
if (block1[2] != 0xD1): raise ValueError('It is not a well-known type that I can understand.')
if (block1[5] != 0x54): raise ValueError('It is not a text NDEF record that I can interpret.')
recordlen = block1[4]
payload = block1[9:16]
i = 0x2
while i <= 0x2 + math.floor((recordlen-9)/16): # read entire payload record
block = bus.read_i2c_block_data(addr, i, 16)
payload = payload + block
i = i+1
# print(' '.join(format(x, '02X') for x in payload))
del payload[recordlen-2:] # trim the extras
if (payload[recordlen-3] != 0xFE):
raise ValueError('Expecting EOF marking 0xFE. Record length incorrect.')
del payload[recordlen-3:]
payload = ''.join(chr(x) for x in payload)
value = re.findall(r'(["\'])(.*?)\1', payload)
if (len(value)<2): raise ValueError('Please put two values in " " or \' \'.')
return value[0][1], value[1][1] # return ssid, psk
except ValueError as err:
print(err)
finally:
bus.close()
# main function
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--force', help='force wifi reconnect', action='store_true')
args = parser.parse_args()
username, password = read_ntag()
if (args.force or addwpa(username, password)):
# re-start wpa_supplicant
print('Connecting Wifi ...')
if (not os.path.isfile('/run/wpa_supplicant.wlan0.pid')):
print('wpa_supplicant not running')
resp = run_program('wpa_cli status')
for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
# run_program('/sbin/wpa_supplicant -s -B -P /run/wpa_supplicant.wlan0.pid -i wlan0 -D nl80211,wext -c /etc/wpa_supplicant/wpa_supplicant.conf')
else:
print('ifdown wlan0')
resp = run_program('ifdown wlan0')
for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
time.sleep(3)
print('ifup wlan0')
resp = run_program('ifup wlan0')
for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
resp = run_program('wpa_cli', True) # run wpa_cli in interactive mode to get wpa_supplicant output
for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
resp = run_program('wpa_cli status')
for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
print('Getting IP address...')
time.sleep(10)
ipaddr = subprocess.check_output(['hostname', '-I'])
print('IP Address: {}'.format(ipaddr))
else: print('Nothing to do')
print('Type "sudo startx" to enter graphic mode.')
if __name__ == '__main__':
main()
===== Subprocess and Wifi =====
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# glenn@sensepost.com / @glennzw
# Handle wireless networking from Python
# The name (evil.py) is a play on 'wicd'
from subprocess import Popen, call, PIPE
import errno
from types import *
import logging
import sys
import logging
import time
import argparse
import re
import shlex
SUPPLICANT_LOG_FILE = "wpa_supplicant.log"
"""
This bit of code allows you to control wireless networking
via Python. I chose to encapsualte wpa_supplicant because
it has the most consistent output with greatest functionality.
Currently supports OPEN, WPA[2], and WEP.
#e.g:
>>> iface = get_wnics()[0]
>>> start_wpa(iface)
>>> networks = get_networks(iface)
>>> connect_to_network(iface, "myHomeNetwork", "WPA", "singehackshackscomounaniƱa")
>>> is_associated(iface)
True
>>> do_dhcp(iface)
>>> has_ip(iface)
True
"""
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(filename)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='evil.log',
filemode='w')
def run_program(rcmd):
"""
Runs a program, and it's paramters (e.g. rcmd="ls -lh /var/www")
Returns output if successful, or None and logs error if not.
"""
cmd = shlex.split(rcmd)
executable = cmd[0]
executable_options=cmd[1:]
try:
proc = Popen(([executable] + executable_options), stdout=PIPE, stderr=PIPE)
response = proc.communicate()
response_stdout, response_stderr = response[0], response[1]
except OSError, e:
if e.errno == errno.ENOENT:
logging.debug( "Unable to locate '%s' program. Is it in your path?" % executable )
else:
logging.error( "O/S error occured when trying to run '%s': \"%s\"" % (executable, str(e)) )
except ValueError, e:
logging.debug( "Value error occured. Check your parameters." )
else:
if proc.wait() != 0:
logging.debug( "Executable '%s' returned with the error: \"%s\"" %(executable,response_stderr) )
return response
else:
logging.debug( "Executable '%s' returned successfully. First line of response was \"%s\"" %(executable, response_stdout.split('\n')[0] ))
return response_stdout
def start_wpa(_iface):
"""
Terminates any running wpa_supplicant process, and then starts a new one.
"""
run_program("wpa_cli terminate")
time.sleep(1)
run_program("wpa_supplicant -B -Dwext -i %s -C /var/run/wpa_supplicant -f %s" %(_iface, SUPPLICANT_LOG_FILE))
def get_wnics():
"""
Kludgey way to get wireless NICs, not sure if cross platform.
"""
r = run_program("iwconfig")
ifaces=[]
for line in r.split("\n"):
if "IEEE" in line:
ifaces.append( line.split()[0] )
return ifaces
def get_networks(iface, retry=10):
"""
Grab a list of wireless networks within range, and return a list of dicts describing them.
"""
while retry > 0:
if "OK" in run_program("wpa_cli -i %s scan" % iface):
networks=[]
r = run_program("wpa_cli -i %s scan_result" % iface).strip()
if "bssid" in r and len ( r.split("\n") ) >1 :
for line in r.split("\n")[1:]:
b, fr, s, f = line.split()[:4]
ss = " ".join(line.split()[4:]) #Hmm, dirty
networks.append( {"bssid":b, "freq":fr, "sig":s, "ssid":ss, "flag":f} )
return networks
retry-=1
logging.debug("Couldn't retrieve networks, retrying")
time.sleep(0.5)
logging.error("Failed to list networks")
def _disconnect_all(_iface):
"""
Disconnect all wireless networks.
"""
lines = run_program("wpa_cli -i %s list_networks" % _iface).split("\n")
if lines:
for line in lines[1:-1]:
run_program("wpa_cli -i %s remove_network %s" % (_iface, line.split()[0]))
def connect_to_network(_iface, _ssid, _type, _pass=None):
"""
Associate to a wireless network. Support _type options:
*WPA[2], WEP, OPEN
"""
_disconnect_all(_iface)
time.sleep(1)
if run_program("wpa_cli -i %s add_network" % _iface) == "0\n":
if run_program('wpa_cli -i %s set_network 0 ssid \'"%s"\'' % (_iface,_ssid)) == "OK\n":
if _type == "OPEN":
run_program("wpa_cli -i %s set_network 0 auth_alg OPEN" % _iface)
run_program("wpa_cli -i %s set_network 0 key_mgmt NONE" % _iface)
elif _type == "WPA" or _type == "WPA2":
run_program('wpa_cli -i %s set_network 0 psk "%s"' % (_iface,_pass))
elif _type == "WEP":
run_program("wpa_cli -i %s set_network 0 wep_key %s" % (_iface,_pass))
else:
logging.error("Unsupported type")
run_program("wpa_cli -i %s select_network 0" % _iface)
def is_associated(_iface):
"""
Check if we're associated to a network.
"""
if "wpa_state=COMPLETED" in run_program("wpa_cli -i %s status" % _iface):
return True
return False
def has_ip(_iface):
"""
Check if we have an IP address assigned
"""
status = run_program("wpa_cli -i %s status" % _iface)
r = re.search("ip_address=(.*)", status)
if r:
return r.group(1)
return False
def do_dhcp(_iface):
"""
Request a DHCP lease.
"""
run_program("dhclient %s" % _iface)
def main():
print "[--- EViL. Python wireless network manager. ---]"
print "[ glenn@sensepost.com / @glennzw\n"
parser = argparse.ArgumentParser()
parser.add_argument("-n","--nics", help="List wireless network interfaces.", action="store_true")
parser.add_argument("-l","--list", help="List wireless networks (specify adapter).", action="store_true")
parser.add_argument("-i","--iface", help="Specify interface.")
parser.add_argument("-c","--connect", help="Connect to network.", action="store_true")
parser.add_argument("-s","--ssid", help="Specify SSID")
parser.add_argument("-t","--type", help="Specify network type (OPEN, WEP, WPA, WPA2)")
parser.add_argument("-p","--passw", help="Specify password or key.")
args = parser.parse_args()
if len(sys.argv) < 2:
print "[!] No options supplied. Try --help."
sys.exit(-1)
if args.nics:
nics = get_wnics()
if nics:
print "[+] Available NICs:"
for nic in get_wnics():
print nic
else:
print "[W] No wireless interfaces found :-("
elif args.list:
if not args.iface:
print "[!] Please specify interface. Use --help for help."
sys.exit(-1)
else:
if args.iface not in get_wnics():
print "[E] Bad interface! - '%s'" % args.iface
sys.exit(-1)
print "[+] Searching for available networks..."
start_wpa(args.iface)
networks = get_networks(args.iface)
if networks:
networks = sorted(networks, key=lambda k: k['sig'])
print "[+] Networks in range:"
for network in networks:
print " SSID:\t%s" % network['ssid']
print " Sig:\t%s" % network['sig']
print " BSSID:\t%s" % network['bssid']
print " Flags:\t%s" % network['flag']
print " Freq:\t%s\n" % network['freq']
else:
print "[W] No wireless networks detected :-("
elif args.connect:
if not args.iface or not args.ssid or not args.type or (args.type != "OPEN" and not args.passw):
print "[E] Missing options for --connect. Check --help for assistance."
sys.exit(-1)
else:
sys.stdout.write( "[+] Associating to '%s' on '%s' (may take some time)... " % (args.ssid, args.iface))
sys.stdout.flush()
if args.iface not in get_wnics():
print "[E] No such wireless interface! - '%s'" % args.iface
sys.exit(-1)
start_wpa(args.iface)
connect_to_network(args.iface, args.ssid, args.type, args.passw)
while not is_associated(args.iface):
time.sleep(1)
print "Success."
sys.stdout.write("[+] Requesting DHCP lease... ")
sys.stdout.flush()
do_dhcp(args.iface)
while not has_ip(args.iface):
time.sleep(1)
print "Success. (%s)" % has_ip(args.iface)
print "[+] Associated and got lease. Hoorah."
if __name__ == "__main__":
main()
===== Resources =====
==== Plotly & Dash ====
- https://python-graph-gallery.com/plotly/
- https://stackoverflow.com/questions/40471026/subplots-with-two-y-axes-each-plotly-and-python-pandas
- https://medium.com/@yahyasghiouri1998/dockerize-your-dash-app-f502275475fa
- https://devopscube.com/build-docker-image/
- https://stackoverflow.com/questions/76004602/how-do-we-access-the-value-of-dcc-store-in-plotly
- https://stackoverflow.com/questions/63225707/plotly-dash-refreshing-global-data-on-reload