knowledge_base:programming:python:python3

Python Programming

python -m venv .venv
.\.venv\Scripts\activate
pip install <package-name>
pip freeze > requirements.txt
pip install -r requirements.txt
deactivate

Note: It is highly recommended to download the Python program from python.org directly. Python installed from Microsoft Store may have issues (speaking from my personal experience when wxPython was used)

  1. Install pyinstaller package pip install -U pyinstaller
  2. Execute pyinstaller –noconsole –onefile –windowed your_application.py

tested on Raspbian Jessie

#!/usr/bin/env python3
import smbus
import math
import re
import subprocess
import shlex
import os
import time
import signal
import functools
import argparse

# common i2c functions are:
# read_byte(addr)
# write_byte(addr, val)
# read_byte_data(addr, cmd)
# write_byte_data(addr, cmd, val)
# read_word_data(addr, cmd)
# write_word_data(addr, cmd, val)
# read_i2c_block_data(addr, cmd, count)
# write_i2c_block_data(addr, cmd, vals)

class TimedOut(Exception):
	pass

def call_with_timeout(timeout, f, *args, **kwargs):
	"""Call f with the given arguments, but if timeout seconds pass before
	f returns, raise TimedOut. The exception is raised asynchronously,
	so data structures being updated by f may be in an inconsistent state.
	"""
	def handler(signum, frame):
		raise TimedOut("Timed out after {} seconds.".format(timeout))

	old = signal.signal(signal.SIGALRM, handler)
	try:
		signal.alarm(timeout)
		try:
			return f(*args, **kwargs)
		finally:
			signal.alarm(0)
	finally:
		signal.signal(signal.SIGALRM, old)

def with_timeout(timeout):
	"""Decorator for a function that causes it to timeout after the given
	number of seconds.
	"""
	def decorator(f):
		@functools.wraps(f)
		def wrapped(*args, **kwargs):
			return call_with_timeout(timeout, f, *args, **kwargs)
		return wrapped
	return decorator

# def run_program(rcmd):
# Runs a program, and it's parameters (e.g. rcmd = 'ls -lh /var/www')
# Returns output if sucessful, or None and logs error if not.
@with_timeout(50)
def run_program(rcmd, wpacli=False):
	cmd = shlex.split(rcmd)
	try:
		proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
		if (wpacli):
			for line in iter(proc.stdout.readline, ''):
				line = line.decode('utf-8')
				if (re.search(r'Trying to associate with', line)) :
					print(line, end='')
					scanned = re.search(r"Trying to associate with\s*(..:..:..:..:..:..).*?SSID='(.*?)'", line)
					if (scanned): print('SSID = "{}", bssid = "{}"'.format(scanned.group(2), scanned.group(1)))
				elif (re.search(r'CTRL-EVENT-CONNECTED - Connection to', line)): # connected
					print(line, end='')
					break
				elif (re.search(r'CTRL-EVENT-REGDOM-CHANGE', line)):
					print(line, end='')
#					break # not connected but could be another ssid to try
				elif (re.search(r'CTRL-EVENT', line) or re.search(r'WPA: 4-Way Handshake failed', line) or
					re.search(r'WPS-AP-AVAILABLE', line) or re.search(r'Associated with', line) or
					re.search(r'WPA: Key negotiation completed with', line)): print(line, end='')
				elif (re.search(r'<\d>', line)): print(line, end='')
			resp = proc.communicate(input=b'quit\n')
		else:
			resp = proc.communicate()
	except TimedOut:
		print('timed out')
		proc.terminate()
		resp = proc.communicate()
	finally:
		return resp

# Function to put credentials to /etc/wpa_supplicant/wpa_supplicant.conf
# Returns True if updated or added, False if nothing to do
def addwpa(ssid, psk):
	if (not ssid or not psk): return
	try:
		f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'r')
		wpafile = f.read()
		pattern = r'(network\s*=\s*{.*?ssid\s*=\s*"' + ssid + r'".*?psk\s*=\s*")(.*?)(".*?})'
#		print('Search pattern: {}'.format(pattern))
		network = re.search(pattern, wpafile, re.DOTALL)
		if (network): # update exiting network
			oldpsk = network.group(2)
			if (psk != oldpsk): # only do work if needed
				substr = r'\g<1>' + psk + r'\g<3>'
				newfile = re.subn(pattern, substr, wpafile, flags=re.DOTALL) # flags are important
				f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'w')
				f.write(newfile[0])
				f.close()
				print('Existing Wifi network "{}" has been updated.'.format(ssid))
				return True
			else:
				print('Existing Wifi network "{}" was up to date.'.format(ssid))
				return False
		else: # add new network
			newnet = 'network={\n\tssid="' + ssid + '"\n\tmode=0\n\tpsk="' + psk + '"\n}\n'
			f = open('/etc/wpa_supplicant/wpa_supplicant.conf', 'a')
			f.write(newnet)
			f.close()
			print('New Wifi network "{}" has been added.'.format(ssid))
			return True
	except (OSError, IOError) as err:
		print(err)

def read_ntag():
	bus = smbus.SMBus(1)
	addr = 0x55
	block1 = bus.read_i2c_block_data(addr, 0x01, 16) # read the first user block and check validity
	try:
		if (block1[1] == 0xFF and block[2] != 0xD1):
			raise ValueError('Message has more than 248 characters are not supported')
		if (block1[2] != 0xD1): raise ValueError('It is not a well-known type that I can understand.')
		if (block1[5] != 0x54): raise ValueError('It is not a text NDEF record that I can interpret.')
		recordlen = block1[4]
		payload = block1[9:16]
		i = 0x2
		while i <= 0x2 + math.floor((recordlen-9)/16): # read entire payload record
			block = bus.read_i2c_block_data(addr, i, 16)
			payload = payload + block
			i = i+1
	#	print(' '.join(format(x, '02X') for x in payload))
		del payload[recordlen-2:] # trim the extras
		if (payload[recordlen-3] != 0xFE):
			raise ValueError('Expecting EOF marking 0xFE. Record length incorrect.')
		del payload[recordlen-3:]
		payload = ''.join(chr(x) for x in payload)
		value = re.findall(r'(["\'])(.*?)\1', payload)
		if (len(value)<2): raise ValueError('Please put two values in " " or \' \'.')
		return value[0][1], value[1][1] # return ssid, psk
	except ValueError as err:
		print(err)
	finally:
		bus.close()

	

# main function
def main():
	parser = argparse.ArgumentParser()
	parser.add_argument('-f', '--force', help='force wifi reconnect', action='store_true')
	args = parser.parse_args()

	username, password = read_ntag()
	if (args.force or addwpa(username, password)):
		# re-start wpa_supplicant
		print('Connecting Wifi ...')
		if (not os.path.isfile('/run/wpa_supplicant.wlan0.pid')):
			print('wpa_supplicant not running')
			resp = run_program('wpa_cli status')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
#			run_program('/sbin/wpa_supplicant -s -B -P /run/wpa_supplicant.wlan0.pid -i wlan0 -D nl80211,wext -c /etc/wpa_supplicant/wpa_supplicant.conf')
		else:
			print('ifdown wlan0')
			resp = run_program('ifdown wlan0')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			time.sleep(3)
			print('ifup wlan0')
			resp = run_program('ifup wlan0')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			resp = run_program('wpa_cli', True) # run wpa_cli in interactive mode to get wpa_supplicant output
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			resp = run_program('wpa_cli status')
			for x in list(map(lambda x: x.decode('utf-8'), resp)): print(x, end='')
			print('Getting IP address...')
			time.sleep(10)
			ipaddr = subprocess.check_output(['hostname', '-I'])
			print('IP Address: {}'.format(ipaddr))
	else: print('Nothing to do')
	print('Type "sudo startx" to enter graphic mode.')

if __name__ == '__main__':
	main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# [email protected] / @glennzw
# Handle wireless networking from Python
# The name (evil.py) is a play on 'wicd'
from subprocess import Popen, call, PIPE
import errno
from types import *
import logging
import sys
import logging
import time
import argparse
import re
import shlex

SUPPLICANT_LOG_FILE = "wpa_supplicant.log"

"""
This bit of code allows you to control wireless networking
via Python. I chose to encapsualte wpa_supplicant because
it has the most consistent output with greatest functionality.

Currently supports OPEN, WPA[2], and WEP.

#e.g:

>>> iface = get_wnics()[0]
>>> start_wpa(iface)
>>> networks = get_networks(iface)
>>> connect_to_network(iface, "myHomeNetwork", "WPA", "singehackshackscomounaniña")
>>> is_associated(iface)
True
>>> do_dhcp(iface)
>>> has_ip(iface)
True

"""

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(filename)s: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='evil.log',
                    filemode='w')


def run_program(rcmd):
    """
    Runs a program, and it's paramters (e.g. rcmd="ls -lh /var/www")
    Returns output if successful, or None and logs error if not.
    """

    cmd = shlex.split(rcmd)
    executable = cmd[0]
    executable_options=cmd[1:]    

    try:
        proc  = Popen(([executable] + executable_options), stdout=PIPE, stderr=PIPE)
        response = proc.communicate()
        response_stdout, response_stderr = response[0], response[1]
    except OSError, e:
        if e.errno == errno.ENOENT:
            logging.debug( "Unable to locate '%s' program. Is it in your path?" % executable )
        else:
            logging.error( "O/S error occured when trying to run '%s': \"%s\"" % (executable, str(e)) )
    except ValueError, e:
        logging.debug( "Value error occured. Check your parameters." )
    else:
        if proc.wait() != 0:
            logging.debug( "Executable '%s' returned with the error: \"%s\"" %(executable,response_stderr) )
            return response
        else:
            logging.debug( "Executable '%s' returned successfully. First line of response was \"%s\"" %(executable, response_stdout.split('\n')[0] ))
            return response_stdout


def start_wpa(_iface):
    """
    Terminates any running wpa_supplicant process, and then starts a new one.
    """
    run_program("wpa_cli terminate")
    time.sleep(1)
    run_program("wpa_supplicant -B -Dwext -i %s -C /var/run/wpa_supplicant -f %s" %(_iface, SUPPLICANT_LOG_FILE))

def get_wnics():
    """
    Kludgey way to get wireless NICs, not sure if cross platform.
    """
    r = run_program("iwconfig")
    ifaces=[]
    for line in r.split("\n"):
        if "IEEE" in line:
            ifaces.append( line.split()[0] )
    return ifaces



def get_networks(iface, retry=10):
    """
    Grab a list of wireless networks within range, and return a list of dicts describing them.
    """
    while retry > 0:
        if "OK" in run_program("wpa_cli -i %s scan" % iface):
            networks=[]
            r = run_program("wpa_cli -i %s scan_result" % iface).strip()
            if "bssid" in r and len ( r.split("\n") ) >1 :
                for line in r.split("\n")[1:]:
                    b, fr, s, f = line.split()[:4]
                    ss = " ".join(line.split()[4:]) #Hmm, dirty
                    networks.append( {"bssid":b, "freq":fr, "sig":s, "ssid":ss, "flag":f} )
                return networks
        retry-=1
        logging.debug("Couldn't retrieve networks, retrying")
        time.sleep(0.5)
    logging.error("Failed to list networks")


def _disconnect_all(_iface):
    """
    Disconnect all wireless networks.
    """
    lines = run_program("wpa_cli -i %s list_networks" % _iface).split("\n")
    if lines:
        for line in lines[1:-1]:
            run_program("wpa_cli -i %s remove_network %s" % (_iface, line.split()[0]))  


def connect_to_network(_iface, _ssid, _type, _pass=None):
    """
    Associate to a wireless network. Support _type options:
    *WPA[2], WEP, OPEN
    """
    _disconnect_all(_iface)
    time.sleep(1)
    if run_program("wpa_cli -i %s add_network" % _iface) == "0\n":
        if run_program('wpa_cli -i %s set_network 0 ssid \'"%s"\'' % (_iface,_ssid)) == "OK\n":
            if _type == "OPEN":
                run_program("wpa_cli -i %s set_network 0 auth_alg OPEN" % _iface)
                run_program("wpa_cli -i %s set_network 0 key_mgmt NONE" % _iface)
            elif _type == "WPA" or _type == "WPA2":
                run_program('wpa_cli -i %s set_network 0 psk "%s"' % (_iface,_pass))
            elif _type == "WEP":
                run_program("wpa_cli -i %s set_network 0 wep_key %s" % (_iface,_pass))
            else:
                logging.error("Unsupported type")
            
            run_program("wpa_cli -i %s select_network 0" % _iface)
            
def is_associated(_iface):
    """
    Check if we're associated to a network.
    """
    if "wpa_state=COMPLETED" in run_program("wpa_cli -i %s status" % _iface):
        return True
    return False

def has_ip(_iface):
    """
    Check if we have an IP address assigned
    """
    status = run_program("wpa_cli -i %s status" % _iface)
    r = re.search("ip_address=(.*)", status)
    if r:
        return r.group(1)
    return False

def do_dhcp(_iface):
    """
    Request a DHCP lease.
    """
    run_program("dhclient %s" % _iface)


def main():
    print "[--- EViL. Python wireless network manager. ---]"
    print "[        [email protected] / @glennzw\n"
    parser = argparse.ArgumentParser()
    parser.add_argument("-n","--nics", help="List wireless network interfaces.", action="store_true")
    parser.add_argument("-l","--list", help="List wireless networks (specify adapter).", action="store_true")
    parser.add_argument("-i","--iface", help="Specify interface.")
    parser.add_argument("-c","--connect", help="Connect to network.", action="store_true")
    parser.add_argument("-s","--ssid", help="Specify SSID")
    parser.add_argument("-t","--type", help="Specify network type (OPEN, WEP, WPA, WPA2)")
    parser.add_argument("-p","--passw", help="Specify password or key.")
    args = parser.parse_args()

    if len(sys.argv) < 2:
        print "[!] No options supplied. Try --help."
        sys.exit(-1)

    if args.nics:
        nics = get_wnics()
        if nics:
            print "[+] Available NICs:"
            for nic in get_wnics():
                print nic
        else:
            print "[W] No wireless interfaces found :-("
    elif args.list:
        if not args.iface:
            print "[!] Please specify interface. Use --help for help."
            sys.exit(-1)
        else:
            if args.iface not in get_wnics():
                print "[E] Bad interface! - '%s'" % args.iface
                sys.exit(-1)
            print "[+] Searching for available networks..."
            start_wpa(args.iface)
            networks = get_networks(args.iface)
            if networks:
                networks = sorted(networks, key=lambda k: k['sig']) 
                print "[+] Networks in range:"
                for network in networks:
                    print " SSID:\t%s" % network['ssid']
                    print " Sig:\t%s" % network['sig']
                    print " BSSID:\t%s" % network['bssid']
                    print " Flags:\t%s" % network['flag']
                    print " Freq:\t%s\n" % network['freq']
            else:
                print "[W] No wireless networks detected :-("
    elif args.connect:
        if not args.iface or not args.ssid or not args.type or (args.type != "OPEN" and not args.passw):
            print "[E] Missing options for --connect. Check --help for assistance."
            sys.exit(-1)
        else:
            sys.stdout.write( "[+] Associating to '%s' on '%s' (may take some time)... " % (args.ssid, args.iface))
            sys.stdout.flush()
            if args.iface not in get_wnics():
                print "[E] No such wireless interface! - '%s'" % args.iface
                sys.exit(-1)
            start_wpa(args.iface)
            connect_to_network(args.iface, args.ssid, args.type, args.passw)
            while not is_associated(args.iface):
                time.sleep(1)
            print "Success."
            sys.stdout.write("[+] Requesting DHCP lease... ")
            sys.stdout.flush()
            do_dhcp(args.iface)
            while not has_ip(args.iface):
                time.sleep(1)
            print "Success. (%s)" % has_ip(args.iface)
            print "[+] Associated and got lease. Hoorah."

if __name__ == "__main__":
    main()
  • Last modified: 2024/12/26 23:12
  • by George Wayne