Python Programming
Python Virtual Environments
Create virtual environment inside the project folder
python -m venv .venv
Activate It
.\.venv\Scripts\activate
Install Packages
pip install <package-name>
Create Requirements.txt
pip freeze > requirements.txt
Install Packages All at Once
pip install -r requirements.txt
Deactivate
deactivate
Distribute Standalone EXE using Pyinstaller
Note: It is highly recommended to download the Python program from python.org directly. Python installed from Microsoft Store may have issues (speaking from my personal experience when wxPython was used)
- Install pyinstaller packagepip install -U pyinstaller
- Executepyinstaller –noconsole –onefile –windowed your_application.py
Regular Expression
Threading
Pandas Dataframe Multiindexing
My NTAG, I2C, Wifi, Subprocess, Timeout, Argparsing Example
tested on Raspbian Jessie
#!/usr/bin/env python3
import smbus
import math
import re
import subprocess
import shlex
import os
import time
import signal
import functools
import argparse
# common i2c functions are:
# read_byte(addr)
# write_byte(addr, val)
# read_byte_data(addr, cmd)
# write_byte_data(addr, cmd, val)
# read_word_data(addr, cmd)
# write_word_data(addr, cmd, val)
# read_i2c_block_data(addr, cmd, count)
# write_i2c_block_data(addr, cmd, vals)
class TimedOut(Exception):
	pass
def call_with_timeout(timeout, f, *args, **kwargs):
	"""Call f with the given arguments, but if timeout seconds pass before
	f returns, raise TimedOut. The exception is raised asynchronously,
	so data structures being updated by f may be in an inconsistent state.
	"""
	def handler(signum, frame):
		raise TimedOut("Timed out after {} seconds.".format(timeout))
	old = signal.signal(signal.SIGALRM, handler)
	try:
		signal.alarm(timeout)
		try:
			return f(*args, **kwargs)
		finally:
			signal.alarm(0)
	finally:
		signal.signal(signal.SIGALRM, old)
def with_timeout(timeout):
	"""Decorator for a function that causes it to timeout after the given
	number of seconds.
	"""
	def decorator(f):
		@functools.wraps(f)
		def wrapped(*args, **kwargs):
			return call_with_timeout(timeout, f, *args, **kwargs)
		return wrapped
	return decorator
# def run_program(rcmd):
# Runs a program, and it's parameters (e.g. rcmd = 'ls -lh /var/www')
# Returns output if sucessful, or None and logs error if not.
@with_timeout(50)
def run_program(rcmd, wpacli=False):
	cmd = shlex.split(rcmd)
	try:
		proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
		if (wpacli):
			for line in iter(proc.stdout.readline, ''):
				line = line.decode('utf-8')
				if (re.search(r'Trying to associate with', line)) :
					print(line, end='')
					scanned = re.search(r"Trying to associate with\s*(..:..:..:..:..:..).*?SSID='(.*?)'", line)
					if (scanned): print('SSID = "{}", bssid = "{}"'.format(scanned.group(2), scanned.group(1)))
				elif (re.search(r'CTRL-EVENT-CONNECTED - Connection to', line)): # connected
					print(line, end='')
					break
				elif (re.search(r'CTRL-EVENT-REGDOM-CHANGE', line)):
					print(line, end='')
#					break # not connected but could be another ssid to try
				elif (re.search(r'CTRL-EVENT', line) or re.search(r'WPA: 4-Way Handshake failed', line) or
					re.search(r'WPS-AP-AVAILABLE', line) or re.search(r'Associated with', line) or
					re.search(r'WPA: Key negotiation completed with', line)): print(line, end='')
				elif (re.search(r'<\d>', line)): print(line, end='')
			resp = proc.communicate(input=b'quit\n')
		else:
			resp = proc.communicate()
	except TimedOut:
		print('timed out')
		proc.terminate()
		resp = proc.communicate()
	finally:
		return resp
# Function to put credentials to /etc/wpa_supplicant/wpa_supplicant.conf
# Returns True if updated or added, False if nothing to do
def addwpa(ssid, psk):
	if (not ssid or not psk): return
	try:
		f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'r')
		wpafile = f.read()
		pattern = r'(network\s*=\s*{.*?ssid\s*=\s*"' + ssid + r'".*?psk\s*=\s*")(.*?)(".*?})'
#		print('Search pattern: {}'.format(pattern))
		network = re.search(pattern, wpafile, re.DOTALL)
		if (network): # update exiting network
			oldpsk = network.group(2)
			if (psk != oldpsk): # only do work if needed
				substr = r'\g<1>' + psk + r'\g<3>'
				newfile = re.subn(pattern, substr, wpafile, flags=re.DOTALL) # flags are important
				f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'w')
				f.write(newfile[0])
				f.close()
				print('Existing Wifi network "{}" has been updated.'.format(ssid))
				return True
			else:
				print('Existing Wifi network "{}" was up to date.'.format(ssid))
				return False
		else: # add new network
			newnet = 'network={\n\tssid="' + ssid + '"\n\tmode=0\n\tpsk="' + psk + '"\n}\n'
			f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'a')
			f.write(newnet)
			f.close()
			print('New Wifi network "{}" has been added.'.format(ssid))
			return True
	except (OSError, IOError) as err:
		print(err)
def read_ntag():
	bus = smbus.SMBus(1)
	addr = 0x55
	block1 = bus.read_i2c_block_data(addr, 0x01, 16) # read the first user block and check validity
	try:
		if (block1[1] == 0xFF and block[2] != 0xD1):
			raise ValueError('Message has more than 248 characters are not supported')
		if (block1[2] != 0xD1): raise ValueError('It is not a well-known type that I can understand.')
		if (block1[5] != 0x54): raise ValueError('It is not a text NDEF record that I can interpret.')
		recordlen = block1[4]
		payload = block1[9:16]
		i = 0x2
		while i <= 0x2 + math.floor((recordlen-9)/16): # read entire payload record
			block = bus.read_i2c_block_data(addr, i, 16)
			payload = payload + block
			i = i+1
	#	print(' '.join(format(x, '02X') for x in payload))
		del payload[recordlen-2:] # trim the extras
		if (payload[recordlen-3] != 0xFE):
			raise ValueError('Expecting EOF marking 0xFE. Record length incorrect.')
		del payload[recordlen-3:]
		payload = ''.join(chr(x) for x in payload)
		value = re.findall(r'(["\'])(.*?)\1', payload)
		if (len(value)<2): raise ValueError('Please put two values in " " or \' \'.')
		return value[0][1], value[1][1] # return ssid, psk
	except ValueError as err:
		print(err)
	finally:
		bus.close()
	
# main function
def main():
	parser = argparse.ArgumentParser()
	parser.add_argument('-f', '--force', help='force wifi reconnect', action='store_true')
	args = parser.parse_args()
	username, password = read_ntag()
	if (args.force or addwpa(username, password)):
		# re-start wpa_supplicant
		print('Connecting Wifi ...')
		if (not os.path.isfile('/run/wpa_supplicant.wlan0.pid')):
			print('wpa_supplicant not running')
			resp = run_program('wpa_cli status')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
#			run_program('/sbin/wpa_supplicant -s -B -P /run/wpa_supplicant.wlan0.pid -i wlan0 -D nl80211,wext -c /etc/wpa_supplicant/wpa_supplicant.conf')
		else:
			print('ifdown wlan0')
			resp = run_program('ifdown wlan0')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			time.sleep(3)
			print('ifup wlan0')
			resp = run_program('ifup wlan0')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			resp = run_program('wpa_cli', True) # run wpa_cli in interactive mode to get wpa_supplicant output
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			resp = run_program('wpa_cli status')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			print('Getting IP address...')
			time.sleep(10)
			ipaddr = subprocess.check_output(['hostname', '-I'])
			print('IP Address: {}'.format(ipaddr))
	else: print('Nothing to do')
	print('Type "sudo startx" to enter graphic mode.')
if __name__ == '__main__':
	main()
Subprocess and Wifi
#!/usr/bin/env python # -*- coding: utf-8 -*- # [email protected] / @glennzw # Handle wireless networking from Python # The name (evil.py) is a play on 'wicd' from subprocess import Popen, call, PIPE import errno from types import * import logging import sys import logging import time import argparse import re import shlex SUPPLICANT_LOG_FILE = "wpa_supplicant.log" """ This bit of code allows you to control wireless networking via Python. I chose to encapsualte wpa_supplicant because it has the most consistent output with greatest functionality. Currently supports OPEN, WPA[2], and WEP. #e.g: >>> iface = get_wnics()[0] >>> start_wpa(iface) >>> networks = get_networks(iface) >>> connect_to_network(iface, "myHomeNetwork", "WPA", "singehackshackscomounaniña") >>> is_associated(iface) True >>> do_dhcp(iface) >>> has_ip(iface) True """ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(filename)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='evil.log', filemode='w') def run_program(rcmd): """ Runs a program, and it's paramters (e.g. rcmd="ls -lh /var/www") Returns output if successful, or None and logs error if not. """ cmd = shlex.split(rcmd) executable = cmd[0] executable_options=cmd[1:] try: proc = Popen(([executable] + executable_options), stdout=PIPE, stderr=PIPE) response = proc.communicate() response_stdout, response_stderr = response[0], response[1] except OSError, e: if e.errno == errno.ENOENT: logging.debug( "Unable to locate '%s' program. Is it in your path?" % executable ) else: logging.error( "O/S error occured when trying to run '%s': \"%s\"" % (executable, str(e)) ) except ValueError, e: logging.debug( "Value error occured. Check your parameters." ) else: if proc.wait() != 0: logging.debug( "Executable '%s' returned with the error: \"%s\"" %(executable,response_stderr) ) return response else: logging.debug( "Executable '%s' returned successfully. First line of response was \"%s\"" %(executable, response_stdout.split('\n')[0] )) return response_stdout def start_wpa(_iface): """ Terminates any running wpa_supplicant process, and then starts a new one. """ run_program("wpa_cli terminate") time.sleep(1) run_program("wpa_supplicant -B -Dwext -i %s -C /var/run/wpa_supplicant -f %s" %(_iface, SUPPLICANT_LOG_FILE)) def get_wnics(): """ Kludgey way to get wireless NICs, not sure if cross platform. """ r = run_program("iwconfig") ifaces=[] for line in r.split("\n"): if "IEEE" in line: ifaces.append( line.split()[0] ) return ifaces def get_networks(iface, retry=10): """ Grab a list of wireless networks within range, and return a list of dicts describing them. """ while retry > 0: if "OK" in run_program("wpa_cli -i %s scan" % iface): networks=[] r = run_program("wpa_cli -i %s scan_result" % iface).strip() if "bssid" in r and len ( r.split("\n") ) >1 : for line in r.split("\n")[1:]: b, fr, s, f = line.split()[:4] ss = " ".join(line.split()[4:]) #Hmm, dirty networks.append( {"bssid":b, "freq":fr, "sig":s, "ssid":ss, "flag":f} ) return networks retry-=1 logging.debug("Couldn't retrieve networks, retrying") time.sleep(0.5) logging.error("Failed to list networks") def _disconnect_all(_iface): """ Disconnect all wireless networks. """ lines = run_program("wpa_cli -i %s list_networks" % _iface).split("\n") if lines: for line in lines[1:-1]: run_program("wpa_cli -i %s remove_network %s" % (_iface, line.split()[0])) def connect_to_network(_iface, _ssid, _type, _pass=None): """ Associate to a wireless network. Support _type options: *WPA[2], WEP, OPEN """ _disconnect_all(_iface) time.sleep(1) if run_program("wpa_cli -i %s add_network" % _iface) == "0\n": if run_program('wpa_cli -i %s set_network 0 ssid \'"%s"\'' % (_iface,_ssid)) == "OK\n": if _type == "OPEN": run_program("wpa_cli -i %s set_network 0 auth_alg OPEN" % _iface) run_program("wpa_cli -i %s set_network 0 key_mgmt NONE" % _iface) elif _type == "WPA" or _type == "WPA2": run_program('wpa_cli -i %s set_network 0 psk "%s"' % (_iface,_pass)) elif _type == "WEP": run_program("wpa_cli -i %s set_network 0 wep_key %s" % (_iface,_pass)) else: logging.error("Unsupported type") run_program("wpa_cli -i %s select_network 0" % _iface) def is_associated(_iface): """ Check if we're associated to a network. """ if "wpa_state=COMPLETED" in run_program("wpa_cli -i %s status" % _iface): return True return False def has_ip(_iface): """ Check if we have an IP address assigned """ status = run_program("wpa_cli -i %s status" % _iface) r = re.search("ip_address=(.*)", status) if r: return r.group(1) return False def do_dhcp(_iface): """ Request a DHCP lease. """ run_program("dhclient %s" % _iface) def main(): print "[--- EViL. Python wireless network manager. ---]" print "[ [email protected] / @glennzw\n" parser = argparse.ArgumentParser() parser.add_argument("-n","--nics", help="List wireless network interfaces.", action="store_true") parser.add_argument("-l","--list", help="List wireless networks (specify adapter).", action="store_true") parser.add_argument("-i","--iface", help="Specify interface.") parser.add_argument("-c","--connect", help="Connect to network.", action="store_true") parser.add_argument("-s","--ssid", help="Specify SSID") parser.add_argument("-t","--type", help="Specify network type (OPEN, WEP, WPA, WPA2)") parser.add_argument("-p","--passw", help="Specify password or key.") args = parser.parse_args() if len(sys.argv) < 2: print "[!] No options supplied. Try --help." sys.exit(-1) if args.nics: nics = get_wnics() if nics: print "[+] Available NICs:" for nic in get_wnics(): print nic else: print "[W] No wireless interfaces found :-(" elif args.list: if not args.iface: print "[!] Please specify interface. Use --help for help." sys.exit(-1) else: if args.iface not in get_wnics(): print "[E] Bad interface! - '%s'" % args.iface sys.exit(-1) print "[+] Searching for available networks..." start_wpa(args.iface) networks = get_networks(args.iface) if networks: networks = sorted(networks, key=lambda k: k['sig']) print "[+] Networks in range:" for network in networks: print " SSID:\t%s" % network['ssid'] print " Sig:\t%s" % network['sig'] print " BSSID:\t%s" % network['bssid'] print " Flags:\t%s" % network['flag'] print " Freq:\t%s\n" % network['freq'] else: print "[W] No wireless networks detected :-(" elif args.connect: if not args.iface or not args.ssid or not args.type or (args.type != "OPEN" and not args.passw): print "[E] Missing options for --connect. Check --help for assistance." sys.exit(-1) else: sys.stdout.write( "[+] Associating to '%s' on '%s' (may take some time)... " % (args.ssid, args.iface)) sys.stdout.flush() if args.iface not in get_wnics(): print "[E] No such wireless interface! - '%s'" % args.iface sys.exit(-1) start_wpa(args.iface) connect_to_network(args.iface, args.ssid, args.type, args.passw) while not is_associated(args.iface): time.sleep(1) print "Success." sys.stdout.write("[+] Requesting DHCP lease... ") sys.stdout.flush() do_dhcp(args.iface) while not has_ip(args.iface): time.sleep(1) print "Success. (%s)" % has_ip(args.iface) print "[+] Associated and got lease. Hoorah." if __name__ == "__main__": main()