Python NotesThese are my personal notes that I use as a quick help in my work.
|
|
See separate file with Python code snippets
py --version
Windows
python3 --version
Linux, outside of a virtual enviroment
python --version
in a virtual environment
python -V
all environments
python -c command [arg]
... quote command with single quotes.
Exit with ^z or quit() or exit(). ^d on *nix
python -m module_name [arg]
Run a module that is somewhere in the system path. Note: no ".py" because this is a module name, not a file name
Sort of equivalent to:
python
import module_name
python -m
searches in sys.path for the named module (without .py) and executes it as the main module. You can also import (import to do "help" and see doc)
python module_name.py [arg]
Multi-line\
command \
with a backslash
# This is a comment
Complex type summary:
Type | Immutable | Ordered | Empty | ||
---|---|---|---|---|---|
String | '...' "..." """...""" | imm | Y | s[0] is 1st char | "" |
List | [a,] or [a,b] | mut | Y | access by offset: l[0] is 1st element | [] |
Tuple | (a,) or (a, b) | imm | Y | access by offset: t[0] is 1st element | () |
Dict | {"a":b, "c":d} | mut | N | access by keys: d["k"]; keys are unique. | {} |
Set | {a, b} | mut | N | access by iteration; elements are unique | set() |
File | n/a |
Note:
A tuple is immutable. But items it contains, such as lists, can be mutable.
User-defined classes are mutable.
Never pass a mutable object as a default value for a parameter.
A module is basically a file with a a_module.py
extension.
Import into another module with import a_module
.
Access the objects in the module with a_module.obj
.
The module should be in the current directory or in the PYTHONPATH
environment variable
(see with sys.path
).
Add a path: sys.path.append("full path")
See where the module was found: a_module.__file__
Provide alias for the module: import a_module as the_module_alias
Import another module's objects into the current module's namespace/symbol table with from a_module input name1,name2
.
A module can be executed as a script: python a_module.py
. In this case, the "__name__"
dunder variable
is "__main__"
. The following skips code when the module is imported, meaning when it is not a stand-alone script:
if __name__ == "__main__":
In the file that I am executing: __name__ == "main"
In imported modules: __name__ == "module_name" #
module name is the file name
In functions: the_function_name.__name__ == "the_function_name"
A package is basically a sub-directory, which we will call a_package
.
Be sure to put a file called "__init__.py"
in the sub-directory a_package
.
Import a module from the package with one of the two following lines:
from a_package import a_module
or
import a_package.a_module as a_mod
With __all__ = ["module1", "module2"]
in the __init.py__
, the listed modules are automatically
loaded when doing from a_package import *
It is considered bad practice to use from a_package import *
Python adds the current directory to sys.path
when running a script. See about PYTHONPATH
below
Troubleshooting for package structure:
__init__.py
file in the sub-directory?PYTHONPATH
environment variable set? When using pipenv
, the easiest is to add export PYTHONPATH=.
to the .env
in the root of the project.import pkg_name.module_name
added to the top of the file? pkg_name
is the name of the sub-directory or sub-directories. There is no .py
after the module name.
It looks like you have to think in terms of the Python path sys.path
, which is a list of directories. When running a script,
the local directory is automatically one of the directories in the list sys.path
. To import modules from another directory, here are the options:
import sys
sys.path.append("full path")
__init.py__
. Then import as a package.sys.path.append(r".")
at the top of the first module when having difficulty with packaging.Execute from root.
This works without __init__.py
in the root.
File a.py ------------
import b.bmod
print(b.bmod.b_fctn()) # bmod module imports c.cmod
end a.py ------------
File b/__init__.py
File b/bmod.py ------------
import c.cmod
def b_fctn():
return "calling: " + c.cmod.c_fctn()
end a.py ------------
File c/__init__.py
File c/cmod.py ------------
def c_fctn():
return "Executing " + c_fctn.__name__
end a.py ------------
Alternative for file a.py above:
File a.py ------------
from b import bmod
print(bmod.b_fctn()) # Notice: no 'b.'
end a.py ------------
If you are in the b sub-directory, and if the PYTHONPATH
is not set to the parent directory,
then add the parent to the sys.path
.
The b/bmod.py file stays the same
File b/b.py ------------
import sys
sys.path.append("..")
import bmod
print(bmod.b_fctn()) # bmod module imports c.cmod
end a.py ------------
The variable sys.path
is a list of strings that determines the interpreter's search path for modules.
It is initialized from these locations:
PYTHONPATH
(a list of directory names, with the same syntax as the shell variable PATH
).Add directories to the sys.path
with the following code:
import sys
sys.path.append('/some/thing/python')
The best is to set the PYTHONPATH
to the root of the project: export PYTHONPATH=.
With this, import the modules with the dot notation:
import folder.module
If you are in a sub-folder of the project's root folder, do: export PYTHONPATH=..
In pipenv
, set it in the .env
file:
### .env file
export PYTHONPATH=.
See details on package installation: https://packaging.python.org/tutorials/installing-packages/
README.md
.gitignore
LICENSE
Pipfile
app/
__init__.py
app.py
docs/
/conf.py
tests/
test_basic.py
test_advanced.py
Another structure:
Root:
.git
README.md
.gitignore
Pipfile
Pipfile.lock
abc # dir with source
common # folder
transformers # folder
tests # folder
common # folder for unit tests
transformers # folder for unit tests
integration_test # folder
configs # folder
print(" ", end='')
# end
supresses the end of line
dir(module name)
-> sorted list of strings with the names defined in the module
dir()
--> current defined names
__builtin__
Start interactive python shell:
import module
help()
???
help(module)
Shows information on the module (import first). Put this in the .py file too, helps a lot
help(object)
Shows information on the object
help(object())
Shows information on what the object returns
object.__dict__
Also shows information on the object
(note: __dict__
not __dir__
).
object().__dict__
Information on what the object returns
In interactive shell, the _ is the last value
My thoughts on styling:
If pip is not installed, do:
python3 -m ensurepip --default-pip
py -m ensurepip --default-pip
If necessary:
sudo apt install python3-pip
or
python3 -m pip --user
On Windows
py -m pip install ...
On Linux and Mac, do pip3:
pip3 install ...
sudo apt install python-numpy
sudo apt install python-matplotlib
c:\Python34\Scripts\pip.exe install matplotlib
c:\Python27\Scripts\pip.exe install matplotlib
Preferably install in a virtual environment.
Environment variables:
PYTHONHOME
: location of the standard Python libraries (default: prefix/lib/pythonversion
and exec_prefix/lib/pythonversion
)
Set PYTHONHOME
to prefix:exec_prefix
Install from a requirements file:
python3 -m pip install -r requirements.txt
Behind a firewall, you may have to do add "--trusted-host pypi.org --trusted-host files.pythonhosted.org"
as follows:
pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org package...
Upgrade pip:
python3 -m pip install --upgrade pip setuptools wheel
Upgrade any package (notice same syntax as upgrade of pip):
python3 -m pip install --upgrade SomeProject
Replace python
with python3
in a virtual environment
Add these to the PATH environment variable:
C:\Users\..user..\AppData\Local\Programs\Python\Python38-32
C:\Users\..user..\AppData\Local\Packages\PythonSoftwareFoundation.Python. ....\LocalCache\local-packages\Python310\Scripts
If you set the PYTHONPATH
env var as the root of the project,
all imports will find the modules in the subdirectories
set PYTHONSTARTUP=PYTHONSTARTUP.txt
this is a file executed before first prompt
See https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHOME
Installing a version that is not current
Go to python.org, and look for the list of versions.
Choose a version with an installer, otherwise you will have to run the install scripts.
Or, install with the regular installer, with one of the options:
sudo apt install python3.8
sudo yum install python38
sudo amazon-linux-extras install python3.8
It seems to be with the "." in "apt", and without in "yum".
import sys
sys.argv[0] #
this is the command
sys.argv[1] #
first argument
s.py:
import sys
sys.exit(0)
# 0 is successful, 1 or more means error
s.py another option:
raise SystemExit(1)
python3 s.py
ret=$? #
get the return code now (or it is lost when another command is issued)
if [ ${ret} -ne 0 ]
then
# handle error
fi
Am I in a virtual environment? Run where python
in Windows, or which python
in Linux.
Online coding space: colab.research .google . com ????
Start by installing:
pip install pipenv
(on linux: pip3)
On Windows, add the following to the PATH environment variable (this assumes installation of python from python.org):
C:\Users\<username>\AppData\Roaming\Python\Python38\Site-Packages
C:\Users\<username>\AppData\Roaming\Python\Python38\Scripts
pip3 list #
list of installed packages
cd directory
pipenv shell --python .....python.exe
pipenv shell --python /usr/bin/python3
#This creates a new file "Pipfile"
#and creates the virtual env: see pipenv --venv
# in case of error, try re-installing: pip install pipenv
pipenv --venv # see where virtual env is stored
pipenv install pandas
pip list # shows all installed packages. Notice "pip" not "pipenv"
deactivate # deactivate the venv
Edit the Pipfile if needed
Move dev packages from [packages] to a section called [dev-packages], such as pytest, pylint, jupyter, ...
Pipfile.lock contains the exact versions of what was installed
pipenv install --ignore-pipfile # this installs the software from the pipfile.lock file instead
# in this way, I can reproduce the environment exactly as I tested it
pipenv install --dev # load with the dev-packages
# delete by removing the directory that is given by 'pipenv --venv'
# or:
cd project_directory_where_the_Pipenv_file_is_located
pipenv --rm
# existing installation:
# After first time, activate simply with :
pipenv shell # restart shell
pipenv install # installs everthing in the Pipfile
deactivate # or exit
pipenv graph # shows the installed packages and the dependencies
If necessary, do python3 -m pipenv ...
Exit pipenv: exit or deactivate
For a different version of python: Edit pipfile and put version 3.7 pipenv -python 3.7.
Or better: use virtualenv
Doc: https://pipenv.pypa.io/en/latest/basics/
If I get "Shell for UNKNOWN_VIRTUAL_ENVIRONMENT already activated
" then do "exit
" because I am still in a virtual environment
Run a file without opening a shell
pipenv run python a-py.py
Run in the virtual environment, but without having to do "exit" when done
pipenv run python
python -m venv name_of_virtual_env
cd name_of_virtual_env
Scripts\activate.bat
Scripts\pip install . . .
Scripts\python # to start shell
deactivate # when done
(Set the slashes appropriate for the operating system)
venv
module is standand, meaning that no installation is needed
Don't put scripts into directory with virtual environment. And add to .gitignore
Delete the env by deleting the directory
pip list. See all pkgs
pip freeze > requirements.txt
In another installation, use
pip install -r requirements.txt
Seems better for alternate version of Python
pip install virtualenv
Create environment
virtualenv env1
cd env1
source bin/activate
(Linux/Mac)
Scripts\activate
(Windows)
Exit:
deactivate
Virtual env with specific version of python. This requires installation of the
specific version (good luck: I have had varying degrees of success).
Initialize the virtual environment:
virtualenv -p path/python.exe a_dir
virtualenv -p C:\path\python.exe a_dir
virtualenv -p /Library/Frameworks/Python.framework/Versions/3.8/bin/python3 the_env
Use req file
pip install -r requirements.txt
List packages:
pip list
/p>
pipenv shell
and pipenv run
automatically load .env
file.
By default, it is at the root of the project. Set another location with PIPENV_DOTENV_LOCATION
.
######
# file .env
THE_ENV_VAR=abc
THE_PATH=${HOME}/...:/etc/another/path
######
file aaa.py
import os
env_var = os.environ['THE_ENV_VAR']
the_path = os.environ['THE_PATH']
View all:
for k,v in os.environ.items():
print(f"{k}: {v}")
pipenv run pytest
pipenv run lint
pipenv run tidy
pipenv run pytest
pylint file.py
pipenv run tidy
pipenv install pytest
pipenv install pylint
https://packaging.python.org/guides/tool-recommendations/
https://realpython.com/pipenv-guide
applications:
pip install -r requirements.txt
package:
setup.py
Configuration for vi:
:set syntax=python
or set the following (to be verified):
syntax enable
set tabstop=4
set shiftwidth=4
set expandtab
filetype indent on
set autoindent
or:
set sw=4 et ts=4 ai
set smartindent cinwords=if,elif,else,for,while,try,except,finally,def,class
Go to the directory where I want to create the egg:
python.exe setup.py bdist_egg
The second to last line of the output has the location of the resulting file
Open a .egg with 7zip to view
Options for passwords, access keys, and secrets
Improve code:
Documentation, being intuitive
Logging to std_out: use lib logging
Options for exception handling:
Orchestration tool: calls the run.py, which has configuration, initialization, and execution
Configuration in config.yaml. What is executed is the tuple (config, executable)
Meta file for job control: the source files that were successfully loaded
install pyyaml
Execute /Applications/Python 3.7/Install Certificates.command
this command replaces the root certificates of the default Python installation with the ones shipped through the certifi package
No declaration needed or even possible
Assignement: a_var = 3
Note: _ (underscore) is a variable. Generally used to put a throw-away value
Names:
type(x) #
gives type of variable x
if type(x) is str: #
tests if the type is string, list, ...
id() #
identity function
However, object type comparisons should always use isinstance() instead of comparing types directly:
Correct:
if isinstance(obj, int):
Wrong:
if type(obj) is type(1):
The "==
" operator tells us if the objects have the same value:
a == b
The "is" keyword tells us if the underlying objects are the same:
a is b
The preceding line is equivalent to
id(a) == id(b)
0o
(zero o in lower or upper case) is octal
0x
(zero X) is hexadecimal
0b
(zero B) is binary
convert with hex, bin, oct
a + bj # complex number
Naming conventions:
global xyz #
makes variable global. Generally considered sloppy programming
Subclass of int
Literals True
and False
(initial cap, then lower case)
True equiv to int 1, False to int 0
Special characters:
\\
\'
\"
\a #
ASCII Bell (BEL)\b #
ASCII Backspace (BS)\f #
ASCII Formfeed (FF)\n #
ASCII Linefeed (LF)\N{name} #
Character with name in the Unicode database \r #
ASCII Carriage Return (CR)\t
Tab\uxxxx #
Character with 16-bit hex value xxxx (Unicode only)\Uxxxxxxxx #
Character with 32-bit hex value xxxxxxxx (Unicode only)\v #
ASCII Vertical Tab (VT)\ooo #
Character with octal value ooo\xhh #
Character with hex value hhr'literal string or raw string'
safe string (not sure on the details)f'formatted string {variable}'
Inserts the value of the variable a_str.startswith("start_to_look_for")
a_str.startswith("start_to_look_for", n) #
comparison starts at position n
Unicode:
"\u0394" #
Using a 16-bit hex value
"\U00000394" #
Using a 32-bit hex value
str
is the type for text (unicode)
chr(i)
returns a character with code i. "\u234" is unicode
ord(c)
with c as a Unicode character: returns an integer
hex(i)
returns a hexadecimal string
int(s) #
convert string to integer
ord(c) #
return the character code
a_string[0:2] #
first 2 characters! Substring from 0 (counting from 0, meaning position n-1) to position n (counting from 1)
print("12345"[0:1]) # --> "1" (first element)
print("12345"[0]) # --> "1" (first element)
print("12345"[0:3]) # --> "123"
print("12345"[0:30]) # --> "12345"
print("12345"[1:-1]) # --> "234"
print("12345"[3:]) # --> "45"
print("12345"[:3]) # --> "123"
print("12345"[:-2]) # --> "123"
print("12345"[2]) # --> "3"
print("12345"[-1]) # --> "5" (last element)
print("12345"[0:0]) # --> empty
print("1234567890"[::3]) # --> "1470" (first in slice, then every third)
print("1234567890"[::-1]) # --> "0987654321" (reverse)
print("1234567890"[4:9:3]) # --> "58" (first in slice, which is "56789", then every third )
print("1234567890"[:]) # --> Copy
"""multi-
line
string"""
len("asd") #
gives length
str1 is str2 #
true if both identifiers refer to the same location
str1 == str2 #
true if the content is the same. str1 is str2 implies str1 == str2, but not the other way
s = s1 + s2 #
concatenate with "+"
s = 'string' 'string2' #
or string literals side by side
s = 'string' \
or one string on each line, with backslashes (end-of-line indicator)
'string2' #
3*'string' #
-->repeats
See operators below
In py3, if s is a string:
s.split(',') #
if argument is null, then splits based on whitespace. Consecutive white spaces counted as one.
s.find('asdf') #
find the 'asdf' in the string. -1 if not found
"dog" in "the quick dog." #
determine if a a string contains a string
In py2, import the string module and do the following:
string.split(s, ',') #
if null, then splits based on whitespace. Consecutive white spaces counted as one.
str.split() #
alternate
string.find(s, 'asdf') #
find the 'asdf' in the string. -1 if not found
escape: (need followup)
repr(s)
triple quotes
"\""
contains one char: "
r"\""
contains two chars: \
and "
(this is a raw string)
b"\""
bytes, see above
f"text {expr} text"
formatting, see below
a_str.replace('"', '\\"').replace("'", "\\'")
a_str.rstrip('\r\n') #
remove combinations of trailing line feeds and carriage returns
a_str.startswith(begin_str) #
return true if a_str starts with beginstr
a_str.startswith(begin_str, n) #
comparison starts at position n
a_str.ljust(5, ' ') #
Left justify, and pad on the right
Prepare map for patterns:
mp = str.maketrans('abcdefghijklmnopqrstuvwxyz' + 'abcdefghijklmnopqrstuvwxyz'.upper() + '0123456789', 'a'*26 + 'A'*26 + '9'*10)
Translate:
a_string.translate(mp)
Show character code instead of character for special characters:
''.join([c if ord(c)<255 else "[" + str(ord(c)) + "]" for c in list(the_string)])
try out and complete documentation
f"text {var} or {expression}"
f"text {numeric:8.2f}" #
2 digits after the decimal point, and 8 characters in all
If the output does not fit, it is expanded.
f"text {expression=}" #
with "=" before the colon ":", the expression precedes the value.
Some options:
'>5.2f;'
Right align within 5 characters for a float with 2 after the decimal'^5'
Center within 5 characters for a string'0'
Left pad with zeros','
Add a comma for thousands separator'+'
Show sign for both positive and negative numbers'-'
Show sign for only negative numbers'5i', '5d'
integer with sign, 5 spaces or more 'o'
Unsigned octal'x', 'X'
hexadecimal (lower , upper case)'e', 'E'
exponential (lower, upper case).'g', 'G'
Equivalent to 'f'
, or to 'e'
for large or small numbers 'c'
single character'r'
string (based on repr()).'s'
string (based on str()).Literal curly brackets: {{
and }}
.
Note that single quotes have to be used inside the the curly brackets when the f-string is surrounded by double quotes, and vice versa
Note that you cannt use a backslash inside the curly brackets
bytes
is the type for data or 8-bit strings; a literal is b"asdf".
They can be considered lists of small integers (0-255).
"Déjà vu".encode('utf-8')
gives b'D\xc3\xa9j\xc3\xa0 vu'
UTF-8 is the de-facto standard: it is generally safe to assume encoding="utf-8"
.
Convert string to bytes (both options are equivalent):
byts = bytes("abc", "utf-8")
byts = "abc".encode("utf-8")
To bytes (two hexadecimal digits per byte; ignores white space):
bytes.fromhex('2Ef0 F1f2 ')
From bytes to string (separator is optional, -2 keeps every two hexadecimal digits, starting from the left):
b'2ef0 f1f2'.hex(" ", -2)
bytearray objects are a mutable counterpart to bytes objects.
Write bytes to and read bytes from files:
open(name, "rb")
open(name, "wb")
You cannot specify the encoding in binary mode.
+
for concatenation
"abc" * 3 # repeat 3 times, referencing the same element. Leads to odd cases resembling pointer issues in C
// #
floor division (division of two integers with result as integer)from __future__ import division
x > 10 and x < 20 equivalent to 10 < x < 20
+= -= # a += b is a = a + b
Operator precedence (low to high)
Operators | Comments | |
---|---|---|
lambda | ||
x if condition else y | Conditional expression | |
or | ||
and | ||
not x | ||
in, not in, is, is not, <, <=, >, >=, <>, !=, == | ||
| | Bitwise OR | |
^ | Bitwise XOR | |
& | Bitwise AND | |
<<, >> | Shift operators | |
+, - | Addition and subtraction | |
*, /, //, % | Multiplication, division, remainder [8] | |
+x, -x, ~x | Unary minus, unary plus, bitwise NOT | |
** | Exponentiation [9] | |
x[index], x[index:index], x(arguments...), x.attribute | Subscription, slicing, call, attribute reference | |
(expressions...), [expressions...], {key: value...}, 'expressions...' | Binding or tuple display, list display, dictionary display, string conversion |
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
ghost calls ghost: not an issue:
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
non-ghost calls ghost: not an issue:
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
non-ghost calls non-ghost: different results, but not sneaky
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
ghost calls method not in parent: not an issue:
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
non-ghost calls method not in parent: not an issue. The different names make it clear.
main.py | child_kls.py | generic_kls.py |
---|---|---|
|
|
|
TO DO:
Group by:
Convert from date or time or datetime to string, including timezone
Display a time or a date (what does not show in above)
Convert from string to date or time, including tz
Add, substr days, hours
Display delta
Convert delta to int: first to seconds with timedeletavar.total_seconds()
Then to minutes , hours, etc
Current date or time:
import datetime as dt
dt.datetime.now()
import time
time.strftime("%Y-%m-%d", time.localtime())
Naive means object has not timezone information, aware means it does
A datetime object d is aware if both of the following hold:
d.tzinfo is not None
d.tzinfo.utcoffset(d) does not return None. Notice "d" as parameter
A time object t is aware if both of the following hold:
t.tzinfo is not None
t.tzinfo.utcoffset(None) does not return None. Notice "None" as parameter
import time
curr_dttm = time.strftime("%Y%m%d") + "_" + time.strftime("%H%M%S")
strptime(date_string, format)
import datetime as dt
curr_dttm = dt.datetime.strftime(dt.datetime.now(), "%Y%m%d_%H%M%S")
curr_dttm = dt.datetime.strftime(dt.datetime.now(), "%Y%m%d") + "_" + dt.datetime.strftime(dt.datetime.now(), "%H%M%S")
datetime.datetime.now().isoformat(sep='T', timespec='microseconds') #
timespec in ['auto', 'hours', 'minutes', 'seconds', 'milliseconds', 'microseconds']. Do not use utcnow() because some methods are naive datetime.fromtimestamp(timestamp, tz=timezone.utc)
returns datetime from a POSIX timestamp datetime.timestamp()
is inversedatetime.fromisoformat(date_string)
and datetime.now().isoformat()
are inverse operations
ISO format: YYYY-MM-DD[*HH[:MM[:SS[.fff[fff]]]][+HH:MM[:SS[.ffffff]]]]
where * can match any single character.
datetime.astimezone(tz=None) #
if tz is None, then return in the system timezone
import time; time.time() #
current epoch time
from module datetime:
class datetime.date #
Naive date
class datetime.time #
time independent of date, includes attribute tzinfo
class datetime.datetime #
date and time, includes attribute tzinfo.
In the constructor, the year, month, and day are mandatory, hour, sec etc default to 0
class datetime.timedelta #
Contructor: datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
. Arguments may be <0 or > 0
class datetime.tzinfo
from datetime import datetime #
datetime objects
datetime.date(year, month, day) returns a date
datetime.time(hour, minute, second, microsecond, tzinfo)
datetime.datetime(year, month, day, hour, minute, second, microsecond, tzinfo)
note: Objects of the date type are always naive, meaning that they are not aware of the time zone
datetime.today() #
class methoddatetime.now([tz])
datetime.utcnow()
datetime.combine(date, time)
dt of type datetime:
dt.strftime(format)
dt.year #
class attributedt.month
dt.day
dt.hour
dt.minute
dt.second
dt.microsecond
dt.tzinfo
dt.date() #
instance methoddt.time()
dt.timetz()
import time
t1=time.perf_counter()
.......
t2=time.perf_counter()
print (t2-t1) # in seconds
dt + timedelta
dt - timedelta
dt2 - dt1 --> timedelta
CR # 11927
dt2 < dt1
from datetime import timedelta
timedelta([days[, seconds[, microseconds[, milliseconds[, minutes[, hours[, weeks]]]]]]])
All arguments optional / default to 0. Arguments may be ints, longs, or floats, and may be positive or negative. Down to microsecond resolution.
example: timedelta(weeks=40, days=84, hours=23)
Operations: + -
* integer or long
t2 // integer
+t
-t
abs(t)
str(t), repr(t)
from datetime import date
date.today() == date.fromtimestamp(time.time())
dt of type date:
dt.year dt.month dt.day
dt.weekday : 0=Mon, 6=Sun
dt.isoweekday : 1=Mon, 7=Sun
dt.isocalendar() --> 3-tuple, (ISO year, ISO week number, ISO weekday)
dt.isoformat() --> date in ISO 8601 format, 'YYYY-MM-DD'
dt.strftime(format)
dt + timedelta
dt - timedelta
dt2 - dt1 --> timedelta
dt2 < dt1
import time
time.sleep(5) #
sleep 5 secondsConvert delta to int: first convert to seconds with
timedeletavar.total_seconds()
,
then to minutes , hours, etc
Or
Divide directly with timedelta(days_or_hours_or=1):
Timedlta_var / timedelta(days=1)
Timedlta_var / timedelta(hours=1)
Timedlta_var / timedelta(minutes=1)
datetime.strptime(a_date, "%Y-%m-%d").date()
# .date() to get just the date, excluding time
# from date object to string:
date_obj.strftime("%Y-%m-%d")
# set a date object:
datetime(2022, 1, 1).date()
(dt.datetime.combine(dt.datetime.now().date(), dt.time(0,0)) + td).strftime("%H:%M")
Use time.perf_counter()
(not time.time()
) for time stamps in analyzing out performance.
import datetime as dt
import pytz
# py -m pip install pytz
See all time zones with: pytz.all_timezones
Current time in a specific timezone:
print(dt.datetime.now(pytz.timezone('US/Eastern')))
Sample script:
my_tz_lst = [ {"abbr": "West", "nm": "US/Pacific"}, {"abbr": "Gal", "nm": "Pacific/Galapagos"}, {"abbr": "Andes", "nm": "America/Lima"}, {"abbr": "BOS", "nm": "US/Eastern"}, {"abbr": "NL", "nm": "America/St_Johns"}, # Newfoundland {"abbr": "IS", "nm": "Atlantic/Reykjavik"}, {"abbr": "UTC", "nm": "UTC"}, {"abbr": "Dtza", "nm": "Africa/Bamako"}, {"abbr": "Eur", "nm": "Europe/Paris"}, {"abbr": "ZNZ", "nm": "Africa/Dar_es_Salaam"}, {"abbr": "Ind", "nm": "Asia/Kolkata"}, {"abbr": "Phi", "nm": "Asia/Manila"}, ] for t in my_tz_lst: print(f"|{t['abbr']:^7}", end="") print("|") for t in my_tz_lst: print(f"| {dt.datetime.strftime(dt.datetime.now(pytz.timezone(t['nm'])), '%H:%M'):5} ", end="") print("|")
import csv
#https://docs.python.org/3.4/library/csv.html
fn_in = "test_csv_in.csv"
fn_out = "test_csv_out.csv"
with open(fn_in, newline='') as fi:
with open(fn_out, 'w', newline='') as fo:
lines_in = csv.reader(fi, delimiter=',', quotechar='"')
lines_out = csv.writer(fo, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
for row_in in lines_in:
lines_out.writerow(row_in)
CSV Format
Two sections with CSV: merge
import csv
#https://docs.python.org/3.4/library/csv.html
fn_in = "test_csv_in.csv"
fn_out = "test_csv_out.csv"
with open(fn_in, newline='') as fi:
with open(fn_out, 'w', newline='') as fo:
lines_in = csv.reader(fi, delimiter=',', quotechar='"')
lines_out = csv.writer(fo, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
for row_in in lines_in:
lines_out.writerow(row_in)
Excel File
For sqlite, see Database Connections.
import pymysql, pandas as pd
Question: how to escape the regular
db_conn = pymysql.connect(host=db_name, user=mysql_username, password=mysql_password, database=db_schema,charset="utf8")
the_params = {"abc":var1, "defg": var_b + "%"}
the_sql = "select ... where col1 = %(abc)s and col_b like %(defg)s ;"
just_assets = pd.read_sql_query(the_sql , db_conn, params=the_params)
# or (not tested yet)
the_params = (var1, var_b + "%")
the_sql = "select ... where col1 = %s and col_b like %s ;"
"%"
? "%%"
does not seem to work.
An error occurs using the connector from mysql.connector
Note that "--,"
throws an error in MySQL. Put a space after the dash-dash
A plot consists of a figure and one or more axes.
http://matplotlib.org/users/pyplot_tutorial.html
https://matplotlib.org/stable/gallery/index.html
c:\Python27\Scripts\pip.exe install matplotlib
sudo yum install python-matplotlib
import matplotlib.pyplot as plt
...
plt.plot(x, y) #
line plot
plt.scatter(x, y) #
scatter plot
plt.hist(x) #
histogram
plt.hist(x, bins=20) #
histogram with 20 buckets (default is 10)
plt.imshow(a_matrix) #
plots a matrix as an image
plt.imshow(a_matrix, cmap='gray') #
plots a matrix as an image in gray scale
plt.xlabel("...")
plt.ylabel("...")
plt.title("...")
plt.show()
r=np.random.randn(10000,2)
plt.scatter(r[:,0],r[:,1])
plot.savefig() to write to file
subplot(nrows, ncols, plot_number)
MNIST dataset: https://kaggle.com/c/digit-recognizer
http://effbot.org/tkinterbook/tkinter-index.htm
https://wiki.python.org/moin/TkInter
http://www.tcl.tk/man/tcl8.4/TkCmd/contents.htm
import Tkinter
#capital T in py 2, lowercase in py 3
import tkMessageBox
tkMessageBox.showinfo(title, text)
# instead of showinfo: showwarning, showerror, askquestion, askokcancel, askyesno, and askretryignore
#### simple GUI
from Tkinter import *
root = Tk() # top level window is called root by convention
my_frame1 = Frame(root)
my_frame1.pack() #
3 geometry managers: pack, grid, place#add this after packing the container:
b1 = Button(my_frame1)
b1["text"]= "Hi there"
b1["background"] = "green"
b1.pack()
root.mainloop() #
waiting#### same GUI, but with a class
from Tkinter import *
class App:
def __init__(self, the_parent):
self.parent = the_parent #
remember the parent self.my_frame1 = Frame(self.parent)
self.my_frame1.pack()
self.b1 = Button(self.my_frame1)
self.b1["text"]= "Hello, World!"
self.b1["background"] = "green"
self.b1.pack()
self.b1.bind("<Button-1>", self.b1Click)
self.b2 = Button(self.my_frame1)
self.b2.configure(text= "Hello, World!", background = "green")
self.b2.pack()
self.b2.bind("<Button-1>", self.b2Click)
self.b3 = Button(self.my_frame1, text= "Hello, World!", background = "green")
self.b3.pack()
self.b3.bind("<Button-1>", self.b3Click)
root = Tk()
app = App(root)
root.mainloop()
#
bindingwidget.bind(event_type_name, event_handler_name)
#
Buttonswidth attribute is in characters
command handler expects button press AND button release
padx,pady: string with unit. "2m" is 2 mm
#
Frameparameters
borderwidth=5
relief=RIDGE
height=50 width=50 #
Note: often ignored when widgets are addedbackground="white"
#
Frame packingframes has internal and external padding: ipadx,ipady and padx,pady
The padding is specified when packing
Internal is around the widgets
side=TOP | BOTTOM | LEFT | RIGHT
fill=BOTH
expand=YES
Note:
when packing, there is a cavity
As widgets are added, they claim area in the cavity, but do not use is all.
With option expand=YES, it claims the whole area, but will not use all the area
With option fill, it will also grow to use the whole area. fill=X, fill=Y, or fill=BOTH. fill=NONE does not let it grow.
With option anchor="N", the widget's position will be at top. Other values: NE (north east), CENTER, ...
See tt100.py in the thinking_in_tkinter directory
Code that is ready to use:
import sqlite3
sqlite_db_filename = "temp_test.db"
with sqlite3.connect(sqlite_db_filename) as cn:
# with context, I do not need to handle closing. The connection is closed when leaving the context.
# A transaction opens here.
# If there are no errors, it is commited when closing the connection.
# If an error occurs, the transaction is rolled back when closing the connection.
# "autocommit" is False by default
cr = cn.cursor()
# no contexts with cursors unfortunately
cr.execute("drop table if exists t")
cr.execute("create table t (n integer, a text)")
cr.execute("insert into t (n, a) values (1, 'aa')")
cr.execute("insert into t (n, a) values (2, 'bb')")
# Bulk insert:
# cn.executemany("insert into a_table (a,b,c) values (?,?,?)", list_of_tuples)
cn.commit()
# force commit so that a subsequent failed operation does not make this get rolled back
# Note that there is no "begin transaction". A transaction begins when the connection is created,
# just after a commit, or just after a rollback.
# If there is no "try except" , then execution stops and the transaction is rolled back.
# this is fine for adhoc scripts. For more robust applications, trap the error with "try except"
try:
cr.execute("insert into t (n, a) values (0, 'x')") # this should be successful
except Exception as e:
cn.rollback()
print("rollback 1")
else:
cn.commit()
print("commit 1")
try:
cr.execute("insert into t (n, b) values (1, 'x')") # this should fail (note the field name
except Exception as e:
cn.rollback()
print("rollback 2")
else:
cn.commit()
print("commit 2")
sql_q = """select n, a
from t
where n = ?
"""
# as far as I know, sqlite does not allow named parameters
rslt_lst = [r for r in cr.execute(sql_q, (2, ))]
# this runs through the cursor, so you can reuse the cursor for another query
print(cr.description) # show column names
for fld_a, fld_b in rslt_lst: # this puts a name on the fields so as not to use positions, which can be confusing
# put "fld_..." to help identify the results of the query
...
print(f"{fld_a} {fld_b}")
cr.close()
# no contexts with cursors unfortunately
# close explicitely
When working with cursors:
Close cursors after use.
Do connection_object.commit();
after any inserts, updates, or deletes.
Put the results in a list or create a second cursor if you are inside a loop that loops through the first cursor.
import pyodbc
sources = pyodbc.dataSources()
print( sources.keys() ) #
show the defined ODBC sources# I had installed Python 32 bit and was trying to connect via ODBC 64 bit.
# I re-installed Python, but the 64 bit version. It is working.
C:\Python37\scripts\pip.exe install cx_oracle
import cx_Oracle #
Upper case "O"
Note that the ocbc or oracle connection objects must have the same architecture (32 bit or 64 bit) as the version of Python
For some reason, the 32-bit version of Python was first installed (surprising in this day and age - 2019). I forced the installation of 64 bit Python and it worked
def is32or64():
import sys
if sys.maxsize==2147483647:
print("Python 32bit version")
elif sys.maxsize==9223372036854775807:
print("Python 64bit version")
else:
print("sys.maxsize=",sys.maxsize)
Python code:
import cx_Oracle
def call_myquery(qry: string, params: dict):
# params is a dictionary in the form {"bind1": data1, "bind2": data2}
# The SQL has the bind variables as follows
# select ... from ... where col1 = :bind1 and col2 = :bind2
# Do not put quotes around the bind variables
# No ';' at the end of the query
with cx_Oracle.connect(username, password, "host:port/database") as connection:
with connection.cursor() as cursor:
try:
results = cursor.execute(qry, params)
cols = [fld[0] for fld in cursor.description]
# cursor.description contains
# name, type, display_size, internal_size, precision, scale, null_ok
# for each column
# do fld[0] to get just the name
rslt_lst = [{c: r for c,r in zip(cols, row)} for row in results]
except Exception as e:
print(f"Error on executing {qry} due to {e}")
# cursor.close() not needed with the "with" statement
# connection.close() not needed with the "with" statement
return rslt_lst
qry = "select ... where a like :the_bind_var|| '%' "
params = {"the_bind_var": "a value"}
rslt = call_myquery(qry, params)
Stored Procedure:
CREATE OR REPLACE PROCEDURE myproc (a IN VARCHAR2, c IN OUT SYS_REFCURSOR) AS
BEGIN
OPEN c FOR SELECT * FROM myschema.mytable WHERE aa = a;
END;
/
Python code:
import cx_Oracle
def call_myproc(a, b):
with cx_Oracle.connect("username", "password", "hostname:port/service_name") as connection:
with connection.cursor() as cursor:
# Declare a cursor variable for the OUT parameter
result_set = cursor.var(cx_Oracle.CURSOR)
try:
cursor.callproc('myproc', [a, b, result_set])
result_cursor = result_set.getvalue()
# Iterate over the result set and print the values
for row in result_cursor:
print(row)
# or to fetchall()
call_myproc('value_for_a')
import numpy as np
a = np.array([asdf...]) #
Note: Cannot do .append() with np array. Access first element with a[0]
a.shape
gives the shape
Just a few reminders: a vector has one dimension, a matrix has two.
Element-wise operations: + * ** np.sqrt(a) np.log(a) np.exp(a)
Element-wise product requires that operands have the same shape.
a*b # list where each element is ai * bi
This is an element-wise product.
A+4 #
this is "broadcasting" where 4 is added to all the elements of A
np.dot(a,b) = sum(ai * bi)
Dot product, or inner product
This is a dot product of two vectors and assumes that assumes both have the same length.
Dot product of two matrices A*B requires that the inner dimensions match.
np.dot(a,b) or a.dot(b) or b.dot(a) or a@b
# note that a * b is element-wise multiplication, not the dot product
np.inner(a,b) is the same as np.dot(a,b)
np.sum(array) or a.sum()
are the sum of the elements
np.outer(a,b) #
outer product (like a cartesian product)
np.linalg.norm(a) #
the magnitude of a: |a| or sqrt(sum(a * a))
np.linalg.inv(a) #
inverse of a a.dot(np.linalg.inv(a)) gives the identity matrix (a must be square). The identity matrix has 1 on the diagonal, 0 elsewhere.
a.T
is transpose. Note: vector.T == vector
np.linalg.det(a) #
determinant of a
np.diag(2D_array) #
diagonal: a vector of the elements on the diagonal
np.diag(1D_array) #
a 2-D array of the elements on the diagonal, with 0 elsewhere
np.trace(a) #
sum of the diag : np.diag(a).sum()
np.allclose(a,b) #
the difference between the two parameters is small (but not necessarily zero)
two dimensional array (using matrix() is not recommended, use array() instead)
m = np.array([ [...] , [...] ])
Access with m[i][j] or m[i,j]
By convention, the first index is row, second is column
m[:,1])
# --> all rows, second column (column 1). Read ":" as "all"
m[:,:8])
# --> all rows, first 9 columns
m[:,:-2])
# --> all rows, all but last 2 columns
m[:,-2:])
# --> all rows, last 2 columns
ary.ndim
&npsp;# --> gives the number of dimensions
ary.reshape((3,4))
&npsp;# --> gives new shape
Random generator
np.random.normal(size=n, loc=mu, scale=stdev)
# --> array of 50 random numbers from an N(mu,stdev) distribution (or is it sqrt(stdev)?)
np.corrcoef(x,y)
# --> correlation matrix
np.random.default_rng(n)
# --> set the seed
np.random.random((n,m)) #
argument is a tuple
np.random.randn(n,m) #
gaussian distribution. Note that argument is NOT a tuple as above
.mean() .var()
np.random.multivariate_normal(mean=np.array([1,2]), cov=np.array(2x2array), size=1000)
np.mean(x) or x.mean()
np.var(x) or x.var()
np.std(x) or x.std()
Generate arrays:
np.array(a list)
np.zeroes(n) or np.zeroes((n,m)) #
argument is a tuple
np.ones(n) or np.ones((n,m))
np.eye(n)
identity matrix
np.linspace(start_point, end_point, number_of_points) #
create a list of points. Example: np.linspace(0,1,11)
nparray.reshape(n,m) #
with n,m as new dimensions
eigenvalue/vector (http://setosa.io/ev/eigenvectors-and-eigenvalues/)
Convention: rows are the samples, columns are the features. E.g. np.random.randn(100,3) : 100 samples, 3 features
np.cov(a)
gives the co-variance (you might have to do np.cov(a.T)
Reminder: symmetric means a=a.T;
hermitian means a=aH
which is the conjugate transpose of A
eigenvalues,eigenvectors=np.linalg.eig(a) #
returns a tuple
eigenvalues,eigenvectors=np.linalg.eigh(a) #
for symmetric and Hermitian matrices
Solve linear system Ax=b
np.linalg.inv(A).dot(b)
or np.linalg.solve(A,b)
read file
for line in open("the_file.csv"):
row = line.split(',')
sample = map(float, row)
X.append(sample)
X=np.array(X)
import numpy as np
np.select([cond1, cond2, ...], [exp1, exp2, ...], default=exp3)
np.where(cond1, exp1, np.where(cond2, exp2, ...))
# use numpy.where:
df[col3] = np.where((df[col1]-df[col2])>0, df[col1]-df[col2], 0)
# equiv to if col1>col2 then return col1-col2 else return 0
df["aaaaaa"] = np.select(
[ df["col1"] == 'a value'
, np.logical_and(df["col2"] == 'a value', df["col3"].isin(('a','b')))
, np.logical_and(df["col2"] == 'a value', ~df["col3"].isin(('a','b'))) # not in
]
,
[ df["cola"]
, df["colb"]
, df["colc"]
]
, "default"
)
Fast Fourier Transform:
Y=np.fft.fft(y)
Convert to matrix:
import pandas as pd
as_matrix is optional
df=pd.read_csv("the_file.csv", header=None, sep=",").as_matrix()
Y=np.fft.fft(y)
nltk.download() #
manage resources for nltk package
import pandas as pd
import numpy as np
Read to a data frame, with a previously opened connection
df = pd.read_sql("SELECT top "+str(NBR_ROWS)+" * from the_table ;", cn)
Read and write CSV:
df = pd.read_csv("the_file.csv", header=None, sep=",")
df.to_csv("filename.csv", index = False, header=True, sep="\t", compression="gzip")
Parameter header
is the row with the column headers: 0 for first row, None for no headers.
If 1 or more, the previous rows are ignored
Parameter sep
is the column separation
With index=False
removes the added index column
Note: engine="python" (default is C) to get other options
df.to_excel('filename.xlsx') #
Export data frame to Excel. Note that Excel does not handle special characters very well.
df = pd.json_normalize(something_in_json_format) #
Normalize semi-structured JSON data into a flat table (gives structure.element notation)
In the department of useless error messages:
ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0
It seems that this happens when loading a csv into Excel with sep="" (empty separator)!
import sqlite3
import pandas as pd
connlite = sqlite3.connect(the_sqlite_db_name)
r = pd.read_sql_query(sql_cmd, connlite, params=sql_params) # sql_params is a tuple like (1234,)
connlite.close()
d = pd.DataFrame([['a','r','t',3,5],['a','r','t',-1,-3],['a','r','s',2,4]])
# or
df = pd.DataFrame([[1,2.2,"abcd"],[2,3.5,"efgh"],[3,6.3,"ij"]])
df.columns=["a","b","c"]
df
df.info() # gives some basics about the data
df.head(10) # shows first 10 rows
df.tail(10) # shows last 10 rows
df.columns # the columns
df.shape # see the number of rows and columns
df.columns.values
df.index.values
len(df.index) # ==> number of rows
df.values.tolist() #
Transform into a list of lists, in the form of a list of rows (as opposed to dataframes which are organized as a list of columns). I think this can be used for bulk SQL insert (but I have to test this)
df.columns.values.tolist() #
List of column headers
[df.columns.values.tolist()]+df.values.tolist() #
Make it look like a table (verify this snippet, not sure it works)
d = pd.DataFrame({ 'n': [1,2,3,4,5,6,7,8,9,10]
, 'a': ['boat','boat','boat','plane', 'plane','boat','boat','boat','plane', 'plane',]
, 'b': ['red','red','red','blue','blue','red','red','red','blue','blue']
, 'c': ['big','big','small','small','small','big','big','small','small','small']
, 'x': [3,4,-1,5,6,-1,5,6,3,4]
, 'y': [2,-3,-2,2,1,2,2,1,0,4]})
d.groupby(by=['a','b','c'])['x','y'].sum()
d['x'].rolling(3).sum()
d.sort_values(by=['n'], ascending=False)
d.sort_values(by=['n'], ascending=False).groupby('a')['x','y'].rolling(3).sum()
d.sort_values(by=['n'], ascending=False).groupby(['a','b'])[['x','y']].rolling(3).sum()
Get the element on the third row of a column 'AA'. Notice square brackets.
Notice index starts at 0. This assumes reset_index or numbering 0,1,2, ...
df['AA'].iloc[2]
df.iloc[:,0] #
returns the first column. Notice the "i" in "iloc"
df.iloc[0,:] #
returns the first row. Also with "iloc"
The following leads to "chained indexing":
df = df [ ['ISIN', 'Date']]
Do this instead:
df = df.loc[:,['ISIN', 'Date']]
Note:
df.loc[:,'col'] #
returns a series, not a dataframedf.loc[:,['col']] #
returns a dataframe (two square brackets)df[["col2", "col5", "col1", "col2"]] #
Reorder columns, repeats are allowed (notice double square brackets)
Copy only the selected columns
col_lst = ["a", "b", "c", "src"]
df_copy = df[col_lst].copy()
or
df_copy = df[["a", "b", "c", "src"]].copy()
Rename columns:
df.rename(columns={'old_col1':'new_col1', 'old_col2':'new_col2'})
Append one to another (same structure)
df = pd.concat([df1,df2])
df = pd.concat([df, df1.loc[:,non_null_raw_columns_hs]])
Do NOT do this: df = df.append(df1[non_null_raw_columns_hs])
In place, vs assign to new dataframe (however, not all functions allow "inplace")
df = df.something(...)
df.something(..., inplace=True)
Assign to all rows
df['Country'] = "aaa"
Changes to a single column
df["str_col"] = df["str_col"].str.title()
Update Existing vs New Column:
df['c'] = df['c'].str.upper() #
modify existing column as upper case of existing column
df['cc'] = df['c'].str.upper() #
add a new column as upper case of existing column
Add a field
df['newcol'] = "the_source"
df['newcol'] = df['firstcol'] + df['secondcol']
df["meas_ratio"] = df["meas1"] / df["meas2"] #
no error if null
Replace nulls with something, but only for certain rows:
columns_for_which_null_should_be_na = {"A": 0, "B": 'N.A.'} #
"A" ... are the column headers
df.fillna(value=columns_for_which_null_should_be_na)
DataFrame.fillna(value=None, method=None, axis=None, inplace=False, limit=None, downcast=None)[source]
df.isna() equiv to df.isnull()
Set the data type:
df["col"] = df["col"].astype("str")
.apply(f, axis=1) #
f is a function that should have one parameter which is a row
Note: according to one person on web, apply is slower than a loop
df['newcol'] = df.apply(lambda row: row['firstcol'] * row['secondcol'], axis=1)
df['Discounted_Price'] = df.apply(lambda row: row.Cost - (row.Cost * 0.1), axis=1)
Equivalent without "apply":
df['Discounted_Price'] = df['Cost'] - (0.1 * df['Cost'])
Update columns in df based on indices in df_other. DOES NOT JOIN, it just copies data in order of the index
df.update(df_other)
By default: join='left', overwrite=True
If I want to only overwrite the NA, then set overwrite=False
A filter_func allows further filtering
DataFrame.update(other, join='left', overwrite=True, filter_func=None, errors='ignore')
Parameter "on" (type str or list of str): column(s) to join. If None, then joins index-on-index
Other parameters are : lsuffix='', rsuffix='', sort=False
Options for "how" are: 'left' (default), 'right', 'outer', 'inner'
When not joined, values show as NaN
df.join(df_other, on=None, how='left')
Force a key
df.join(df_other.set_index('key'), on='key', how='left')
Uses df_other's index but any column in df: this preserves the original DataFrame's index in the result.
df.join(df_other, on="original_index", rsuffix="_joined", how="left")
df.join(df_other.set_index('col1'), on='col1', how='left')
Join two dataframes by specified columns (as opposed to join
that joins with indexes)
df.merge(df_other, how="inner", left_on=["a", "b"], right_on=["aa", "bb"], suffixes=("_left", "_right"), copy=False, indicator=True)
Parameter copy=True
by default, but I am guessing a False
leads to less issues, based on the doc
The indicator gives the source of the column.
df.merge( df_other \
, left_on = ["cola", "colb", "colc"] \
, right_on = ["colA", "colB", "colC"] \
, suffixes = ("", "_right") \
, how = "left" \
, copy = True \
)
df.set_index("col") #
The column is removed
df.set_index(["col1", "col2"]) #
The columns are removed
df.set_index("col", drop=False) #
The column is kept as a column too
df.set_index(pd.Index([..list of values..])) #
Create the index from a list of values (needs intermediate step of making it an Index object)
Keep the old index, and re-apply when needed (to be confirmed):
original_indx = df.index
df.reset_index(inplace = True)
df.set_index("original_indx")
df.reset_index(inplace=True)
For example, after concatenating dataframes, the original indexes are kept.
Otherwise the indices from each original df will show 0 multiple times, 1 multiple times, ...
Do reset_index
after append so as to have a unique index
And, as a reminder because it is not intuitive, the indexes identify the rows in Python
df.reset_index(drop=False) #
The old index becomes a column
df[df["col"]=="aa"] #
returns rows with "aa" in col
df[len(df[a col])<4]
df[df["col"]=='a_value']
df[df["adate"]<date_limit] #
I formatted as a string yyyy-mm-dd
SQL-like queries of the dataframe
df = df.query("col < 4.5 & col > 4")
df.mask
is similar to df.where
but careful: it affects the elements for which the condition is false
df.isin(("val1", "val2", "val3", ...)) #
==> true where element in the values
df.drop(some labels)
df.drop(df[<some boolean condition>].index)
df.drop(df[df["col"]=="aa"].index) #
drops rows with aa in col
df.drop(df[..see examples under 'filter' above..].index)
df.sort_values(by=["cold"], ascending=True).groupby(by=["cola", "colb", "colc"])[["meas1", "meas2"]].sum()
.groupBy
sorts by default. Therefore sorting before grouping does not make sense
Group by does three things: split a table into groups, apply some operations to each of those smaller tables (i.e. the aggregation function), and combine
In "for g,f in df.groupBy(..)..:
", the resulting dataframe has two parts: the groups and the individual frames.
df.sort_values(by=["cold"], ascending=True).groupby(by="col")["meas"].sum()
sum()
rolling(12).sum()
min()
mean() #
not 'avg'agg('mean') #
not 'avg'size() #
includes NaNcount() #
excludes NaN.sum().nlargest(10)
returns the 10 largest (aggregation on top of aggregation).nunique()
number of unique values.str.contains("a-str") #
Returns a true or false for each row.str.contains("a-str").sum() #
Returns the count of rows containing "a-str"It may be necessary to add .reset_index()
because the columns of the groupby become the index.
reset_index() #
makes them columns again
df.groupby(by=["a", "b"])[["m1", "m2"]].sum().reset_index()
Same, but without group by, in other words across all rows:
df[["col1", "col2"]].agg('max')
df["col"].max()
df["col1"].unique()
Get the unique values of one column
Split a dataframe into multiple dataframes based on the value of a column:
for g,d in df.groupby('col1'):
print(g,":\n",d) # g has one of the group-by values, d has the corresponding values
# note that the original index is kept
df=dtframe.set_index(['col1', 'col2'])
df.unstack(level=n) #
where n corresponds to the index elements above (0 to index length - 1). Default -1.
df.sort_values(by="the_index", axis="index")
df.sort_values(by=["a", "b"], axis="index")
col.value_counts() #
Returns a Series, assume sorted by values descending
df["str_col"].str.lower()
Note that .str is a method for series: works when the data type of a column is string
df["col"].str.title() #
title case
df["col"].str.lower()
.str.contains("a-str") #
Returns a true or false for each row
import numpy as np
np.logical_and(arr1, arr2)
np.logical_or(arr1, arr2)
df.pivot
cannot handle aggregation
df.pivot_table
has to have numerics
df.unstack
is similar
import datetime
print(datetime.datetime.now().isoformat(datetime.datetime.now(), sep='T', timespec='microseconds'))
def show_progress(last_ts=None, label=None):
ts=time.perf_counter()
if last_ts:
print("Elapsed seconds:",ts-last_ts," Last time: ",str(last_ts), " Current time: ",str(ts), " - ", label)
else:
print("Current time: ", str(ts), " - ", label)
return ts
import pyodbc
cn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+sqls_svr+';DATABASE='+sqls_db+';Trusted_Connection=yes')
csr=cn.cursor()
csr.execute("SELECT @@version as version;")
cn.close()
df = pd.read_parquet("file_or_directory")
df.to_csv("output_file", index=False, sep="\t", compression="gzip") # index=False suppresses the added index column
# compression to .gz
df['new_date'] = df.year.astype(str) + '-' + df.month.astype(str).str.zfill(2) + '-' + df.day.astype(str).str.zfill(2)
df.dtypes : returns the types
Doc:
Unless shown otherwise, all expressions shown below return a dataframe
df_new = df. . . .
df = df. . . .
df.show()
Dataset: distributed collection of items
Collection of rows
Called DataFrame, but it is not a Pandas DataFrame
A DataFrame is a Dataset organized into named columns. Conceptually equivalent to a table or a data frame in Python
Data frames are implemented on top of RDDs. RDDs are immutable and their operations are lazy.
Transformations on RDDs return transformations. However, nothing is calculated.
Actions, such as collect(), trigger the actual computation. Otherwise, the computation is not done.
Shared variables:
Clusters:
High concurrency
Standard (recommended for single users)
single node, just for explore
Cluster mgr: connects to all nodes
Driver: has the SparkContext
Worker Node: has an executor. The executor performs the tasks
pool: allows clusters just released to be reused
driver node can be of a different type than the worker nodes. Generally, keep the same
select the latest runtime possible (LTS)
high concurrency for shared clusters, and standard for single users
enable auto-termination
use AWS spot instances if possible
Generally, prefer fewer but larger nodes, because many operations cannot be run in parallel, instead of many smaller nodes
DBU=processing capability per hour
jobs compute: lowest rate, for automated jobs
sql compute: idem, but not always available
all-purpose: higher rate, high concurrency
spot: lower costs , but can be terminated when prices go up
Compare Spark to MapReduce (not to HDFS=Hadoop Distributed File System)
Spark can also use HDFS
Most of the improvement of Spark over MapReduce is the fact that Spark does not have to write to disk at the end of every operation
pipenv install pyspark
sudo apt install default-jre
(sudo apt install scala)
(sudo apt install py4j)
Note:
Set SPARK_LOCAL_IP if you need to bind to another address
Set export SPARK_LOCAL_IP=127.0.0.1
127.0.0.1 did not seem to work
hostname resolves to a loopback address: 127.0.1.1; used 192.168.0.100 instead (IP in local network)
Initialize with SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName('a-name').getOrCreate()
In the pyspark shell, this is not necessary as the session is automatically stored
in the spark
variable
Both lines above initialize the spark session. The second line associates it with an application.
Only one of the two lines is necessary.
Get functions:
import pyspark.sql.functions as F
As with anywhere else in Python, do dir(obj)
to see the methods
and help(obj)
to get more details (if available).
In jupyter notebook, do object?
(object followed by a question mark)
Get documentation on parameters:
print(an_object.explainParams().replace("\n", "\n\n"))
For individual parameters, the following gives documentation. The object can be the instance or the model.
an_object.param
Generally, set the value with set...()
and get the value with get...()
.
df2= spark.createDataFrame([ [1, 2., 'abcd']
, [2, 3., 'efgh']
, [3, 5., 'ij']
]
, schema='a long, b double, c string'
)
from pyspark.sql import Row
df = spark.createDataFrame([ Row(a=1, b=2., c='abcd')
, Row(a=2, b=3., c='efgh')
, Row(a=3, b=5., c='ij')
]
, schema='a long, b double, c string'
)
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
When all else fails, take apart a data frame:
for one_row in tokenized.select(["words",]).collect():
print(one_row["a_column_name"]
Extract column values:
for one_col in one_row:
print("Column in one row:",type(one_col),one_col)
df.count()
df.show(truncate=False)
df.value.contains("Sp"))
(verify syntax; is "contains" only for pandas?)
df.first()
.cache() #
cache the data for each re-use
See the structure with: df.printSchema()
from pyspark.sql.types import StructField, StructType, StringType, TimestampType, IntegerType, BooleanType, DoubleType
data_schema = [StructField('age', IntegerType(),True) # name, type, boolean as nullable (True means nullable)
,StructField('name', StringType(), True)
]
final_struct = StructType(fields=data_schema)
df = spark.read.json('....', schema=final_struct)
# summary
df = spark.read.json('....', schema=StructType(fields=[StructField('age', IntegerType(),True) # name, type, boolean as nullable
,StructField('name', StringType(), True)
]))
Note that letting pyspark infere the schema usually works better.
df.printSchema() #
also df.schema, but this is for passing as an argument
df.dtypes
gives a list of tuples (column_name, type)
df.columns #
show columns (this is an attribute, not a method). See also under "Columns"
df.describe().show() #
See also below
df.head() #
See also under "Get Rows"
df.show(truncate=False)
df.select(df.columns).describe().show()
or just:
df.describe().show()
(this may only show numeric columns)
Format the numbers and the headers:
dscr = df.describe()
dscr.select( dscr["summary"]
, f.format_number(dscr["Open"].cast("float"), 2).alias("open")
, f.format_number(dscr["Close"].cast("float"), 2).alias("close")
, f.format_number(dscr["vol"].cast("int"), 2).alias("vol")
).show()
Count values (profiling)
for c in df.columns:
dfc = df.groupBy(c).count()
# print(c, ": ", df.select(c).distinct().count(), sep="") this is the same as next
print(c, ": ", dfc.count(), sep="")
dfc.orderBy(dfc["COUNT"].desc()).show(5)
Count distinct values for all columns:
for c in df.columns:
di = df.select(c).distinct()
if di.count() < 6:
# if not too many distinct values, then show them
print(c, ":") #, str([v for v in di.values()]))
df.groupBy(c).count().show()
else:
# otherwise, show just the count
print(c, ": ", df.select(c).distinct().count(), sep="")
Count all rows:
print(df.count())
Equivalent to SELECT DISTINCT
df.select(["CNTRY_NM", "CNTRY_CD"]).distinct()
Equivalent to SELECT COUNT(DISTINCT ...)
df.select(["CNTRY_NM", "CNTRY_CD"]).distinct().count() # distinct().count() returns a number
df.show()
df.show(1) #
top 1 row
df.show(1, vertical=True) #
Display vertically. Useful for long values
df.head(n) #
list of row objects (number optional)<
df.take(n) #
list of row objects (same as head)
df.tail(1) #
list of row objects
df.head(n) -->
list of row objects (number optional)
df.head(n)[0] -->
row object
df.head(n)[0][0] -->
first value
for row in df.head(n):.... #
in case a loop is really necessary...
result = df.filter(...).collect() #
use this to keep the data for future use. List of row objects.
result[0] #
First row as row object
result[0].asDict()
-> dictionary for one row
(where result = ....collect()
as shown above)
Most useful:
df.head().col --> value in column 'col' on the first row
df.head(2)[1].col --> value in column 'col' on the 2nd row
for a in df.head(n)
a.col --> value in column 'col' in the first n rows (also tail)
df['col'] --> column object
df.col --> column object (dot notation will not work for field names with spaces, or names like reserved words)
df.select('col') --> dataframe with a single column (and I can do .show())
df.select(['col','col2']) --> dataframe, here with two columns
(df.select(df.col) --> dataframe with column selected via its column object. Better to use .select([...]))
(df.select(df.col1, df.col2) --> dataframe with two column objects. Better to use .select([...]))
Column objects don't really give you data. If you want to see the data for a column object "col"
,
put it back into the dataframe like this: "df.select(col).show()"
Add columns:
df2 = df.withColumn('cc', upper(df.c))
# 'cc' is new col name, second argument is a column object (not a dataframe object)
df2 = df.withColumn("Ratio", df["A"]/df["B"]).select(f.format_number("Ratio",2)) #
Add column and format
Rename columns:
df2 = df.withColumnRenamed('old_name', 'new_name') #
just rename
Alias:
df.select(avg('col').alias('alias-to-display'))
Drop column:
df.drop('col')
import pyspark.sql.types as T
df.a.cast(T.StringType())
Unless shown otherwise, all expressions shown below return a dataframe
import pyspark.sql.functions as F
F.upper(...) or str.upper()
upper, lower, length, ascii, base64, unbase64, trim,l,r, instr(str,substr), substring(str,pos,len), split(str,pattern,limit=-1)
concat, concat_ws(sep,*cols)
format_number(col,d) #
where d is the number of decimals
format_string(format,*cols)
F.lit(n)
or F.lit("a string")
: spreads the literal down the columns
df.select(F.cos(df["col"])+F.lit(2))
: calculation on a column
df.select(F.to_date(F.concat_ws("", F.lit("2024/01/"), F.to_char(df["v1"], F.lit("09"))), "yyyy/MM/dd"))
: calculation on a column. Notice lit(format) for to_char, but not for to_date.
from pyspark.sql.functions import dayofmonth,month,year,weekofyear,dayofweek
from pyspark.sql.functions import hour,dayofyear
from pyspark.sql.functions import format_number,date_format
df.select(['Date', dayofmonth(df['Date'])]).show()
Date stored as : datetime.datetime(2022,4,25,0,0)
df.withColumn("Year",year(df['aDate'])).show() #
add a column
size
split
df.value.contains("Sp"))
(verify syntax; is "contains" only for pandas?)
Keep as nulls
Or drop the rows
df.na.drop() #
drop any row with any null
df.na.drop(thresh=2) #
drop if 2 or less non-null values
df.na.drop(how='all') #
drop if all are null
df.na.drop(subset=['col']) #
drop if values in col are null
Or fill in with values
df.na.fill(0) #
replace null with 0 in numeric columns
df.na.fill('') #
replace null with '' in string columns
df.na.fill('n/a', subset=['col'])
Example of filling the missing values with the average value of the column:
import pyspark.sql.functions as f
avg_value = df.select(mean(df['the_column'])).collect()[0][0]
df.na.fill(avg_value, subset=['the_column'])
Equivalent to NVL
(See "CASE WHEN ELSE"
for replacing with a value from another column)
df_countries = df.select(["CNTRY_NM", "CNTRY_CD", "CRCD"]).distinct().na.fill("NULL", subset=["CNTRY_CD"])
one_df.join(other_df, on=None, how=None)
on: str, list, or Column
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns
how (string): inner (default), cross, outer, full, fullouter, full_outer,
left, leftouter, left_outer, right, rightouter, right_outer, semi,
leftsemi, left_semi, anti, leftanti and left_anti
examples for "on":
name #
column must be on both sides
one_df.name == other_df.name
[one_df.name == other_df.name, one_df.dob == other_df.dob]
Equivalent to JOIN
df_crcd.join(df_avg_funding_rcvd, on=["CRCD", "YEAR"], how="left").join(df_avg_tgt_peo, on=["CRCD", "YEAR"], how="left")
Concatenate two dataframes (vertically, with same structure):
df_new = df1.union(df2)
Unless shown otherwise, all expressions shown below return a dataframe
df.filter(df.a == 1).show() #
column object in the filter
df.filter("a = 1").show() #
condition in sql syntax
When combining coniditions, use "&" "|" "~"
for "and", "or", and "not"
and surround each individual condition with "(...)"
These two are equivalent:
df.filter("Close < 60").count()
df.filter(df["Close"] < 60).count()
And these two:
100.0 * df.filter("High > 80").count()/df.count()
100.0 * df.filter(df["High"] > 80).count()/df.count()
Equivalent to CASE WHEN ELSE
. Use multiple .when if needed.
from pyspark.sql.functions import when
df = df.withColumn("CRCD", when(df["CD"].isNull(), df["NM"]).otherwise(df["CD"]))
Another option, not tried:
from pyspark.sql.functions import coalesce
df.withColumn("B",coalesce(df.B,df.A))
Equivalent to WHERE
df.filter(df["CNTRY_NM"] == "Mali").show()
Remove rows where value is 0
df.filter(df["AVG_FUNDING"] != 0).filter(df["AVG_PEOPLE"] != 0)
df.orderBy('col').show() #
ascending
df.orderBy(df['col'].desc()).show() #
descending
df.orderBy('col', ascending=True).show() #
ascending
df.orderBy('col', ascending=False).show() #
descending
df.orderBy(df["High"].desc()).head(1)[0][0]
df.groupby('col_dim').avg('meas1','meas2').show()
The .groupby()
returns a GroupedData object, and the aggregation methods
return dataframes:
i.e. a.groupby("col").mean()
returns a dataframe.
Other aggregates: avg(), max(), count(), sum()
(all with ())
df.groupBy("col").count().orderBy("count")
# show counts by col, and sort
Aggregate without group by:
The argument is a dictionary, of which the
keys are the column names and values are the desired aggregation
df.agg({'col': 'sum'}).show()
Note that the doc says df.agg(...)
"shorthand for"
df.groupby().agg(...)
i.e. using an empty groupby method
Combine the two methods described above (groupby and agg):
df.groupby('col_dim').agg({'col': 'sum'}).show()
Another option:
from pyspark.sql.functions import countDistinct,avg,stddev
df.select(avg('Sales').alias('Avg Sales')).show()
Pyspark.sql.GroupedData functions: agg, apply, appliInPandas, count, pivot. And avg, mean, max, min, sum
df.select(f.max("Volume"), f.min("Volume")).show()
# this does aggregation with the functions max and min
Get the max (or other aggregate value):
df.agg({"col_name": "max"}).head(1)[0][0]
Pivot:
df.groupBy(...).pivot("pivot-column", [list of values that will become columns]).aggr_fctn
If the list of values is not provided, the process first determines the list.
It is therefore more efficient to provide the list if it is known
Unpivot:
from pyspark.sql import functions as F
stack_expr = "stack(number-of-columns, 'col1', col1, 'col2', col2, ... , 'coln', coln) as (col-where-cols-will-go, col-where-values-will-go)"
un_pivot_df = df.select("non-pivoted-col", F.expr(stack_expr)).where("col-where-values-will-go is not null")
The ".where(...)
" removes the rows with null and can be ommitted
df.filter(...).select(['c1', 'c2']).groupby('col_dim').avg('meas1','meas2').show()
???: df.orderBy(...).filter(...).select(['c1', 'c2']).groupby('col_dim').avg('meas1','meas2').show()
df.select(['c1', 'c2']).where("'c1' like 'abc%'")
(notice backticks in the where)
df.filter(df["High"] == df.agg({"High": "max"}).head(1)[0][0]).select("Date").head(1)[0][0]
df.withColumn("Year", f.year(df["Date"])).groupby("Year").max("High").orderBy("Year").show()
df.withColumn("Month", f.month(df["Date"])).groupby("Month").avg("Close").orderBy("Month").show()
Select the date where a column has the maximum value:
df.filter(df["High"] == df.agg({"High": "max"}).head(1)[0][0]).select("Date").show()
Equivalent to WHERE, GROUP BY, AVG, AS
(renamed columns, alias)
Notice parentheses and operands of or ("|"), and ("&"), and not ("~")
df_avg_tgt_peo = df.filter((df["METRIC"] == "People in need") | (df["METRIC"] == "People targeted")).groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_PEOPLE")
Make the data frame look like a table:
df.createOrReplaceTempView("tablea")
("registers" the dataframe as a table)
Now, levarage the SQL syntax to perform queries.
Remember that, as with other spark operations, the data is not evaluated until a .show()
or
other action is performed.
spark.sql("SELECT count(*) from tablea").show()
dfnew = spark.sql("SELECT count(*) from tablea")
Some Equivalents
PySpark | spark.sql(...) |
---|---|
.groupBy("prediction").count().show()
|
select count(*), prediction from tb group by prediction
|
Generally, no additional imports needed
Text file:
textFile = spark.read.text("zen.txt")
df.withColumn('o', F.concat_ws('|', dff.aa, dff.b)).select('o').write.text('abc.txt')
CSV file:
df.write.csv('ff.csv', header=True)
spark.read.csv('ff.csv', inferSchema=True, header=True).show()
Parquet file:
df.write.parquet('ff.parquet')
spark.read.parquet('ff.parquet').show()
Patterns:
Pattern with just transform:
from pyspark.sql.... import TheClass
the_class_inst = TheClass(...)
the_class_out = the_class_inst.transform(input_data)
Pattern with fit and transform:
from pyspark.sql.... import TheClass
the_class_inst = TheClass(...)
the_class_model = the_class_inst.fit(input_data)
the_class_out = the_class_model.transform(input_or_test_data)
Pattern with fit and transform on same line:
from pyspark.sql.... import TheClass
the_class_inst = TheClass(...)
the_class_out = the_class_inst.fit(input_data).transform(input_data)
Split between training and test data
trn_dta, tst_dta = dta.randomSplit([0.7, 0.3])
VectorAssembler: assemble a vector column from multiple columns
from pyspark.ml.feature import VectorAssembler
inst = VectorAssembler(inputCols=["...", "...", ], outputCol = "features")
# notice plural in inputCols; the parameter is a list
data_out = inst.transform(input_data)
Min-max Scaler: normalize the features so that one is not weighted more than others just because of the scale
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame(
[(1, Vectors.dense([0.2, 2043, 30])),
(2, Vectors.dense([0.3, 2187, 33])),
(3, Vectors.dense([0.1, 2209, 29])),
],
["id", "input_data"]
)
scaler_inst = MinMaxScaler(inputCol="input_data", outputCol="features")
scaler_model = scaler_inst.fit(df)
scaler_out = scaler_model.transform(df)
Standardizer: standardize, so that data has mean 0 and variance 1.
Note that a few values may be less than -1 or greater 1.
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame(
[(1, Vectors.dense([0.2, 2043, 30])),
(2, Vectors.dense([0.3, 2187, 33])),
(3, Vectors.dense([0.1, 2209, 29])),
],
["id", "input_data"]
)
scaler_inst = StandardScaler(inputCol="input_data", outputCol="features", withMean=True, withStd=True)
scaler_model = scaler_inst.fit(df)
scaler_out = scaler_model.transform(df)
Bucketizer: group data into buckets, based on splits
from pyspark.ml.feature import Bucketizer
splits = [
-float("inf"),
-5.0,
0.0,
5.0,
float("inf"),
]
df = spark.createDataFrame(
[(-6.0,),
(-3.1,),
(-2.4,),
( 0.0,),
( 1.5,),
( 2.4,),
( 7.0,),
],
["id", "input_data"]
)
bucktz_inst = StandardScaler(inputCol="input_data", outputCol="features", splits=splits)
bucktz_out = bucktz_inst.transform(df)
StringIndexer: transform strings into indexes, because pyspark ML cannot take strings
from pyspark.ml.feature import StringIndexer
indexer_inst = StringIndexer(inputCol="nm", outputCol="nm")
indexer_model = indexer_inst.fit(input_data)
indexer_out = indexer_model.transform(input_data)
StandardScaler: scale data so that large values to not skew
from pyspark.ml.feature import StandardScaler
scaler_inst = StandardScaler(inputCol="nm", outputCol="nm", withMean=True, withStd=True)
scaler_model = scaler_inst.fit(input_data)
scaler_out = scaler_model.transform(input_data) # notice that the same input data is used for fitting and transforming
Tokenizer: separate a text into a list of words. Note that punctuation has to be removed in a separate operation
from pyspark.ml.feature import Tokenizer
df = spark.createDataFrame( # "Tale of Two Cities" by Charles Dickens
[(1, "It was the best of times, it was the worst of times, "),
(2, "it was the age of wisdom, it was the age of foolishness, "),
(3, "it was the epoch of belief, it was the epoch of incredulity, "),
(4, "it was the season of Light, it was the season of Darkness, "),
],
["id", "sentence"]
)
t = Tokenizer(inputCol="sentence", outputCol="words")
output_data = t.transform(input_data)
RegexTokenizer: Use regex to separate into words
from pyspark.ml.feature import RegexTokenizer
t = RegexTokenizer(inputCol="nm", outputCol="nm", pattern="\\W")
output_data = t.transform(input_data)
StopWordsRemover: remove stop words
from pyspark.ml.feature import StopWordsRemover
r = StopWordsRemover(inputCol="nm", outputCol="nm")
output_data = r.transform(input_data)
N-Grams
from pyspark.ml.feature import NGram
n = NGram(n=2, inputCol="nm", outputCol="nm")
output_data = n.transform(input_data)
TF IDF: term frequency, inverse document frequency. The CountVectorizer can also be used. Requires tokenized data.
from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF(inputCol="tokenized_words", outputCol="nm_out1", numFeatures=20) # numFeatures is the number of features, not mandatory. Usually much larger
output1_data = tf.transform(input_data)
idf = IDF(inputCol="nm_out1", outputCol="nm")
idf_model = idf.fit(output1_data)
output2_data = idf_model.transform(output1_data)
Count Vectorizer
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="nm", outputCol="nm", vocabSize=20, minDF=2)
# vocabSize is vocaulary size, approx 20 (I did not count)
# minDF is the minimum frequency for the word to be taken into account. Here, with 2, each word has to be in at least 2 documents(???)
model = cv.fit(input_data)
output_data = model.transform(input_data)
Pipeline
from pyspark.ml import Pipeline
pipeline_inst = Pipeline(stages=[inst1, inst2, ])
pipeline_model = pipeline_inst.fit(input_data)
output_data = pipeline_model.transform(input_data)
Linear Regression Usually the best starting model for predicting numerical data
from pyspark.ml.regression import LinearRegression
lr_inst = LinearRegression(maxIter=10, featuresCol="nm", labelCol="nm", predictionCol="nm", regParam=0.3, elasticNetParam=0.8)
lr_model = lr_inst.fit(trn_dta)
lr_model.coefficients # if close to zero (and values not too large), then the corresponding variable does not contribute much
lr_model.intercept
lr_pred_dta = lr_model.transform(dta)
Decision Tree Regression
from pyspark.ml.regression import DecisionTreeRegressor
dtr_inst = DecisionTreeRegressor(featuresCol="nm", labelCol="nm")
dtr_model = dtr_inst.fit(trn_dta)
dtr_pred_dta = dtr_model.transform(dta)
Gradient Boosted Tree Regression
from pyspark.ml.regression import GBTRegressor
gbtr_inst = GBTRegressor(featuresCol="nm", labelCol="nm")
gbtr_model = gbtr_inst.fit(trn_dta)
gbtr_pred_dta = gbtr_model.transform(dta)
tst_rslt = lr_model.evaluate(tst_dta)
tst_rslt.r2
tst_rslt.meanAbsoluteError
tst_rslt.rootMeanSquaredError
tst_rslt.meanSquaredError
lr_model.summary.r2
lr_model.summary.meanAbsoluteError
lr_model.summary.rootMeanSquaredError
lr_model.summary.meanSquaredError
lr_model.summary.rootMeanSquaredError / lr_model.intercept # error relative to scale
Regression Evaluator
from pyspark.ml.evaluation import RegressionEvaluator
rmse_eval = RegressionEvaluator(labelCol="nm", predictionCol="nm", metricName="rmse")
rmse = rmse_eval.evaluate(pred_dta)
lr_inst = LinearRegression(maxIter=10, featuresCol="nm", labelCol="nm", predictionCol="nm", regParam=0.3, elasticNetParam=0.8)
lr_model = lr_inst.fit(trn_dta)
lr_model.coefficients # if close to zero (and values not too large), then the corresponding variable does not contribute much
lr_model.intercept
pred_dta = lr_model.transform(tst_dta)
Logistic Regression
from pyspark.ml.classification import LogisticRegression
lr_inst = LogisticRegression(featuresCol="nm", labelCol="nm", predictionCol="nm")
lr_model = lr_inst.fit(trn_dta)
lr_model.coefficients
tst_rslt = lr_model.transform(tst_dta)
pred = lr_model.transform(dta)
Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb_inst = NaiveBayes(featuresCol="feature", labelCol="label", predictionCol="prediction", modelType="multinomial")
# same pattern with model = inst.fit(); rslt = model.transform()
# Multinomial model type when there are multiple categories
Decision Trees
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm", maxDepth=5, maxBins=32)
# same pattern with model = inst.fit(); rslt = model.transform()
dtc_model.numNodes
dtc_model.featureImportances
Random Forest
from pyspark.ml.classification import RandomForestClassifier
rtc = RandomForestClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm", numTrees=20)
# same pattern with model = inst.fit(); rslt = model.transform()
GBT Classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm")
# same pattern with model = inst.fit(); rslt = model.transform()
Perceptron Type of neural network
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [n_inputs, 5, 5, n_categories]
# layers is list of number of neurons per layer.
# First layer: same as the number of inputs
# Last layer: same as the number of categories
mlp_inst = MultilayerPerceptronClassifier(layers=layers)
# same pattern with model = inst.fit(); rslt = model.transform()
Binary Classifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bin_clsf_inst = BinaryClassificationEvaluator(rawPredictionCol="nm", labelCol = "nm")
bin_clsf_inst.evaluate(tst_rslt)
Multi-class classifier (offers more metrics)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
precision_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
recall_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="recallByLabel")
true_pos_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="truePositiveRateByLabel")
false_pos_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="falsePositiveRateByLabel")
acc = acc_eval.evaluate(test_results)
precision = precision_eval.evaluate(test_results)
recall = recall_eval.evaluate(test_results)
true_pos_rate = true_pos_eval.evaluate(test_results)
false_pos_rate = false_pos_eval.evaluate(test_results)
Code for Confusion Matrix
See https://en.wikipedia.org/wiki/Confusion_matrix
test_results.createOrReplaceTempView("test_result")
the_sql = """
select count(*)
, label
, prediction
from test_result
group by label, prediction
order by label, prediction
"""
pred_val = spark.sql(the_sql)
pred_val.show()
for one_row in pred_val.collect():
if one_row["label"] == 0 and one_row["prediction"] == 0:
true_neg = one_row["count(1)"]
if one_row["label"] == 0 and one_row["prediction"] == 1:
false_pos = one_row["count(1)"]
if one_row["label"] == 1 and one_row["prediction"] == 0:
false_neg = one_row["count(1)"]
if one_row["label"] == 1 and one_row["prediction"] == 1:
true_pos = one_row["count(1)"]
print(f"|{true_neg+true_pos+false_neg+false_pos:8d}|pred pos|pred neg|")
print(f"|actu pos|{true_pos:8d}|{false_neg:8d}| (true pos |false neg)")
print(f"|actu neg|{false_pos:8d}|{true_neg:8d}| (false pos|true neg )")
print(f" ")
print(f"True positive rate, recall, sensitivity: {true_pos/(true_pos+false_neg)}")
print(f"False negative rate: {false_neg/(true_pos+false_neg)}")
print(f"False positive rate, probability of false alarm, fall-out: {false_pos/(false_pos+true_neg)}")
print(f"True negative rate, specificity, selectivity: {true_neg/(false_pos+true_neg)}")
print(f" ")
print(f"Positive predictive value, precision: {true_pos/(true_pos+false_pos)}")
print(f"False omission rate: {false_neg/(false_neg+true_neg)}")
print(f"False discovery rate: {false_pos/(true_pos+false_pos)}")
print(f"Negative predictive value: {true_neg/(false_neg+true_neg)}")
print(f" ")
print(f"Accuracy: {(true_pos + true_neg)/(true_neg+true_pos+false_neg+false_pos)}")
K Means Find clusters
from pyspark.ml.clustering import KMeans
kmeans_inst = KMeans(featuresCol="nm", k=2, seed=1) # seed is optional
kmeans_model = kmeans_inst.fit(input_dta)
centers = kmeans_model.clusterCenters() # results in list of arrays
pred = kmeans_model.transform(input_dta) # no labels, so no training vs test
Bisecting K Means Find clusters, with better performance for large data sets
from pyspark.ml.clustering import BisectingKMeans
bkmeans_inst = KMeans(featuresCol="nm", k=2, seed=1) # seed is optional
bkmeans_model = bkmeans_inst.fit(input_dta)
centers = bkmeans_model.clusterCenters() # results in list of arrays
pred = bkmeans_model.transform(input_dta) # no labels, so no training vs test
abc_model.save("filename.mdl") # no standard extension
abc_model = AbcClassName.load("filename.mdl")
df.toPandas() #
this can cause an out-of-memory error
Note that Pandas DataFrames are "eagerly evaluated", which means that all the data has to fit in memory
import pyspark.pandas as ps
psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
psdf.spark.explain()
This is like an explain plan
"Exchange" means that the nodes swap data. Ideally, we do not want this.
"Exchange SinglePartition" means it is using only one partition. Ideally, we want to use all.
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
https://www.udemy.com/course/spark-and-python-for-big-data-with-pyspark/learn/lecture/5856256?src=sac&kw=spar#overview
Spark and Python for Big Data with PySpark
https://databricks.com/try-databricks
Training: https://academy.databricks.com/
user guide: https://docs.databricks.com/user-guide/index.html
Community edition, personal
Create a cluster, then create a notebook. Start with "import pyspark"
upload a file
df = sqlContext.sql("select * from the_table")
New workspace
Basic Notebook:
import pyspark
df = sqlContext.sql("select * from . . .;")
# Databricks creates the context automatically
Create EC2 instance, all traffic (restricted to my IP)
Run these on the instance:
sudo apt update
sudo apt install python3-pip
sudo apt install default-jre
java -version
sudo apt install scala
scala -version
pip install pyspark
pip install jupyter
Run these on the instance:
If the pip install pyspark does not work, try these:
pip py4j
go to spark.apache.org then downloads.
wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
on EC2 instance:
jupyter notebook --generate-config
sudo openssl req -x509 -nodes -days 365 -newkey rsa:1024 -keyout ~/.ssh/forjupytercert.pem -out ~/.ssh/forjupyterkey.pem
vi ~/.jupyter/. . .. .py # the config file
# add the following:
c = get_config()
c.NotebookApp.certfile = u'/home/ubuntu/.ssh/forjupytercert.pem'
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.port = 8888
# start the jupyter notebook
# and modify the link to put the IP address of the EC2 instance
I encountered the following issues:
Minimal lambda function:
def lambda_handler(event, context):
print("event::", event)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
PATH variable includes specific folders in the /opt directory.
Layer paths for each Lambda runtime
Python:
Zip into a zip file:
python.zip
lambda_function.py
other_module.py
python/abc.py
python/def.py
The zip file can have any name,
but it must have a python
sub-directory.
The main executable is lambda_function.py
. Other files can exist at the same level.
Import any additional module as import another_module
Zip into a zip file:
python.zip
python/abc.py
python/def.py
python/requests # all files for "requests" package
python/another_package
The zip file can have any name,
but it must have a python
sub-directory.
Upload to a custom layer.
Then, in lambda function, do: import abc, def
Note: abc.py
can call other libraries, such as pandas, as long as pandas is in a layer
Create a layer for packages
Remember to update the version in the lambda functions using the layer.
The packages have to be downloaded in the same environment as AWS Linux. I successfully did this by spinning up an AWS Linux instance.
pip install virtualenv
virtualenv -p /usr/bin/python3.8 py38something
cd py38
source bin/activate
--target
option (see note below).pip install -t ./python libary_name
zip -r a_file_name.zip ./python
"python"
deactivate
Use the "--target"
option so as to isolate the desired package and its dependencies.
Installing in the default location (lib/python3.8/site-packages/
and lib64/python3.8/site-packages/
)
will include the setuptools
and the wheel
and possibly other packages, and it will split
the dependencies betweebn lib
and lib64
.
If I am creating the main deployment package, then add the lambda_function.py
function to the root of the zip.
The zip files should be less than 50MB in size. Otherwise, upload to the function from an S3 bucket.
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
rsp = bucket.objects.all()
# or
s3 = boto3.client("s3")
rsp = s3.list_objects(Bucket=bucket_name)
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
object = bucket.put_object(
Body=data_string_containing_the_data.encode("utf-8"),
Key=the_key_meaning_the_file_name
)
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
rsp = bucket.upload_file(file_name, object_name)
# or
s3 = boto3.client("s3")
rsp = s3.upload_file(file_name, bucket_name, object_name)
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
returned_data = io.BytesIO()
bucket.download_fileobj(Key=the_key_meaning_the_file_name, Fileobj=returned_data)
print("returned data:\n", returned_data.getvalue())
returned_data.close()
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::the_bucket_name"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3::: the_bucket_name/*"
]
}
]
}
Notice: ListBucket
on bucket, PutObject
and GetObject
on objects, meaning bucket followed by asterisk.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame # needed for fromDF()
Show status: put in cell
%status
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
sc = SparkContext()
glueContext = GlueContext(sc)
Read via catalog:
glue_dyfr = glueContext.create_dynamic_frame.from_catalog(database="..", table_name="..")
glue_dyfr.printSchema()
Count rows:
glue_dyfr.count()
Drop a field and rename another:
glue_dyfr = glue_dyfr.drop_fields(['col2', 'col3']).rename_field('col0', 'new_name')
Map columns:
col_mappings=[
('a_col', "string", 'a_col', "string"),
("col1", "string", "Country Code", "string"),
("col2", "string", "Indicator Name", "string"),
]
glue_dyfr = ApplyMapping.apply(frame = glue_dyfr, mappings = col_mappings)
Write to csv file:
glueContext.write_dynamic_frame.from_options(frame = glue_dyfr,
connection_type = "s3",
connection_options = {"path": "s3://chrisbucket41/gluej-exercise/tgt/"},
format = "csv")
Select a subset of columns and show data:
glue_dyfr.select_fields(['a_col']).toDF().distinct().show()
Convert to PySpark DataFram:
pyspark_df = glue_dyfr.toDF()
Convert back to Glue DynamicFrame:
from awsglue.dynamicframe import DynamicFrame
glue_dyfr = DynamicFrame.fromDF(pyspark_df, glueContext, "a-name")
See also PySpark section for more details
Filter with SQL-like syntax:
pyspark_df = pyspark_df.where("'a_col' != 'Not classified'") # note back ticks
pyspark_df2 = pyspark_df2.filter((pyspark_df2['a_col'] != 'Country Name') & (pyspark_df2['a_col'] != 'Not classified'))
Unpivot:
from pyspark.sql import functions as F
unpiv_df = pyspark_df.select('a_col', F.expr(" num, 'col', col, ..."))
Aggregation:
c = unpiv_df.groupby('a_col').avg("pop")
Rename a column:
c = c.withColumnRenamed('avg(pop)', 'avg_pop')
Join:
pyspark_df = pyspark_df.join(c,'a_col',"left")
columns:
pyspark_df.createOrReplaceTempView("countries")
spark.sql("select count(*) from countries").show()
Convert to a list of lists:
pyspark_df.select(yr_cols).collect()[0]
job_name = "job_from_workbook"
import boto3
glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
myNewJobRun = glue.start_job_run(JobName=job_name) # This starts the job
print(glue.get_job_run(JobName=job_name, RunId=myNewJobRun['JobRunId'])['JobRun']['JobRunState'])
Upload script file with UI or with:
aws s3 cp job.py s3://bucket/folder/
Start job in UI or with:
aws glue start-job-run --job-name "job_name"
Get job progress in UI or with:
aws glue get-job-run --job-name "job_name" --run-id "j..."
Get the run-id from the return when the job was started
Put this in the first cell:
%region us-east-1
%iam_role arn:aws:iam::aws-account-id:role/AWSGlueServiceRoleDev
%worker_type G.1X
%number_of_workers 2
%idle_timeout 30
Anaconda is natively available inside Snowflake.
https://developers.snowflake.com
test query:
select current_version()
The account is the first part of the URL provided at sign-up (the part before ".snowflakecomputing.com"
)
select current_version() as v;
CREATE WAREHOUSE IF NOT EXISTS whname; -- X-Small by default (this is the smallest)
USE WAREHOUSE whname;
CREATE DATABASE IF NOT EXISTS dbname;
USE DATABASE dbname;
CREATE SCHEMA IF NOT EXISTS schname;
CREATE OR REPLACE TABLE schname.tbname(. . .);
CREATE TABLE IF NOT EXISTS schname.tbname(. . .);
ALTER WAREHOUSE IF EXISTS whname RESUME IF SUSPENDED; -- To start up the warehouse
ALTER WAREHOUSE whname SUSPEND; -- To suspend the warehouse
CREATE OR REPLACE FILE FORMAT a_format_name
TYPE = 'CSV'
FIELD_DELIMITER = ';'
SKIP_HEADER = 1
-- or
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
;
-- temporary json table
CREATE OR REPLACE TEMPORARY TABLE a_json_table (json_data VARIANT);
-- Create stage (recommended if loading often from the same location
-- It is probably best to put the schema name too
CREATE OR REPLACE STAGE sch_nm.stage_name FILE_FORMAT = a_format_name;
For PUT
and LIST
, see Snowpark below.
aaaaaaaaaaaaaaa
installed python version 3.9
installed virtualenv
ran python -m pip install -r requirements_39.reqs
requirements taken from page:
https://github.com/snowflakedb/snowflake-connector-python/tree/main/tested_requirements
Snowflake Snippet
import snowflake.connector as sc
with sc.connect(
account = "abc"
, user = un
, password = pw
, warehouse = " WH"
, database = "DB"
) as cn:
sql = "select current_version()"
with cn.cursor() as cr:
cr.execute(sql)
r = cr.fetchone()
print(r[0])
Alternate
import snowflake.connector
params = {"account": . . ., "user": . . ., "password": . . .,
"warehouse": . . ., "database": . . .}
with snowflake.connector.connect(param) as cn:
with cn.cursor() as cr:
cr.execute("select * from . . .")
for r in cr.fetchall():
print(r)
Write to a table:
df.write.mode("overwrite").save_as_table("table1")
Create a view:
df.create_or_replace_view(f"{database}.{schema}.{view_name}")
Select json data elements:
select col_name:json_base_element.elmt.elmt :: float, . . . . from . . .;
Types float, int, . . .
Pandas:
cr.execute(sql)
df = cr.fetch_pandas_all()
Use version 3.8 of Python (as of Nov 2022)
This page said 3.9 was OK: https://docs.snowflake.com/en/user-guide/python-connector-install.html
I installed, but then the pip install "snowflake-snowpark-python[pandas]"
command said that only 3.8 was possible.
In Linux, dowloaded gz from python.org and extracted to /usr/local/lib
sudo apt install libssl-dev libffi-dev
(If needed, do sudo dpkg-reconfigure pkg-name
????? and maybe sudo apt install libdvd-pkg
)
pip install "snowflake-snowpark-python[pandas]"
See more details in "Virtual Environments", under Installation.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import avg # or other function
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import col
# alternate:
import snowflake.snowpark.functions as f
params = { "account": sf_account
, "user": sf_user
, "password": sf_pw
, "warehouse": "wsmall"
, "database": "first"
}
s = Session.builder.configs(params).create()
df = s.table("tb_nm") # PySpark df???
df.write.mode("overwrite").save_as_table("new tb nm")
df = s.sql(sql) # no ";" !
df.show()
s.close()
Upload from local (two forward slashes after "file:"
). Returns a dataframe:
s.sql("PUT file://C:\. . .\basefilename*.csv @sch_nm.stage_name AUTO_COMPRESS=TRUE;").show() -- windows
s.sql("PUT file:///. . ./basefilename*.csv @sch_nm.stage_name AUTO_COMPRESS=TRUE;").show() -- linux
These return a dataframe with the list of files.
Look at the "status" column. "UPLOADED" means successful.
You can put again (overwrite), in which case the status shows "SKIPPED"
Show the files in the stage (returns a dataframe)
s.sql("list @sch_nm.stage_name ;").show()
Remove all the files in the stage. Notice the trailing slash (returns a dataframe).
s.sql("remove @sch_nm.stage_name/ ;").show()
Copy single file from stage to table
COPY INTO a_table
FROM @stage_name/file_name.ext.gz -- put file name here for single file
-- get the file name from the list
FILE_FORMAT = (FORMAT_NAME = a_format_name)
ON_ERROR = 'skip_file' -- or leave out if I want to fail on error
;
Copy multiple files from stage to a single table
COPY INTO a_table
FROM @stage_name -- put just stage name for multiple files
FILE_FORMAT = (FORMAT_NAME = a_format_name)
PATTERN='.*basefilename[1-5].csv.gz' -- put a pattern for the files
-- pattern appears to always start with "."
ON_ERROR = 'skip_file' -- or leave out if I want to fail on error
;
Write dataframe to a table
df.write.mode("overwrite").save_as_table("schema_name.table_name") #
returns None
Snowflake does not support indexes
Options for parameter if_exists: 'fail', 'replace', 'append'
See snippet python_snippets.html#snowpark1
Snowpark:
Look at later:
continue https://python-course.eu/python-tutorial/packages.php and https://docs.python.org/3/tutorial/modules.html
look at this: https://towardsdatascience.com/6-new-features-in-python-3-8-for-python-newbies-dc2e7b804acc
do this: https://docs.snowflake.com/en/user-guide/data-load-internal-tutorial.html
Other packages described below:
airflow
boto3 (AWS)
configparser
diagrams
django
faker
flask
graph-tool
itertools
json
jupyter
logging
networkx and pyvis
NLTK
parquet
pylint
pytest
random
re (regular expressions)
requests
smtp
yaml
zeep
Zen of Python: import this
imageMagick:
pip3 install Wand
for python3, use pip3
pipenv install pyarrow
import antigravity
Levity in Python
to format :
'pipenv install black
# if needed: pipenv install black --pre
Web develpment:
Data science:
ML / AI:
other:
import random
random.random() #
Generate random number between 0 and 1 [random.random() for _ in range(4)] #
Four random number between 0 and 1 random.seed(n) #
n is a number random.randrange(m) #
Randomly choose between 0 and m-1 random.randrange(m, n) #
Randomly choose between m and n-1 random.shuffle(a_list) #
Shuffle the list random.choice(a_list) #
Choose one random.sample(a_list, n) #
Choose n in the list [random.choice(a_list) for _ in range(n)] #
Choose n in the list, allowing duplicates (replacements)
import re
re.match("s", "long string") #
re.search("s", "long string") #
re.split("[ ;.,]", "long string") #
splits based on separators [ ;,.]re.sub("[1-9]", "0", "long string") #
Substitute number with zeros
Shell script:
export AIRFLOW_HOME=~/code/py/airflow
alias python='python3'
AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow standalone
python -m airflow standalone
# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
# Enable the example_bash_operator dag in the home page
See code snippets in separate file
pipenv install awscli
pipenv install boto3 # use this to connect from py to AWS
s3_rec = boto3.resource('s3')
bucket_obj = s3.Bucket(bucket_name)
csv_obj = bucket.Object(key=o.key).get().get('Body').read().decode('utf-8')
df = pd.read_csv(StringIO(csv_obj), delimiter=',')
# write
out_buffer = StringIO()
df.to_csv(out_buffer, index=False)
bucket_obj.put_object(Body=out_buffer.getvalue(), Key=filename)
Ready-to-use sample:
########
# sample file:
[a_group]
param1=a_value
db_name=the_db.{env.id.upper}
########
import configparser
config_file = "/home/. . ./.aws/credentials"
config = configparser.ConfigParser()
config.read(config_file)
datatbase_name=config.get("a_group"," db_name") # see sample above
param_one=config.get("a_group"," param1") # see sample above
aws_access_key=config.get("default","aws_access_key_id") # based on standard .aws
aws_secret_key=config.get("default","aws_secret_access_key") # based on standard .aws
See under snippets python_snippets.html for a more detailed example
Note that spaces before and after the equal sign are ignored.
Spaces after the value are ignored.
Spaces in the value are kept.
A colon ":" can be used instead of an equal sign "="
The following are equivalent:
item=a value with spaces[EOL]
item: a value with spaces[EOL]
item = a value with spaces [EOL]
Hard-code in code:
config = configparser.ConfigParser() #config.read(cfg_fn) config.read_string(""" #put the contents of the config file [a_group] param1=a_value db_name=the_db.abc """)
See snippets:
Diagrams needs graphviz
# pipenv install diagrams
# pipenv install graphviz
# Also download graphviz separately
from diagrams import Diagram, Cluster
import os
os.environ["PATH"] += os.pathsep + r"C:\progfile\Graphviz\bin"
import graphviz
######################################
from diagrams.aws.compute import EC2
from diagrams.aws.network import ELB
from diagrams.aws.network import Route53
with Diagram("descr", show=True, direction="TB") as diag:
# TB = towards bottom
dns = Route53("dns")
load_balancer = ELB("Load Balancer")
with Cluster("Webservers"):
svc_group = [EC2("wb 1"),
EC2("wb 2"),
EC2("wb 3")]
dns >> load_balancer >> svc_group >> dns2
diag
######################################
from diagrams.generic.database import SQL
from diagrams.generic.storage import Storage
from diagrams.programming.flowchart import Document
from diagrams.programming.flowchart import Database
from diagrams.programming.flowchart import StoredData
with Diagram("stored proc") as diag:
src = Storage("the table")
sp = SQL("stored prod")
tgt = Storage("the tgt table")
st = StoredData("sd")
src >> sp >> tgt
diag
https://diagrams.mingrammer.com/
json.dumps(something_complex) #
serializes the object
json.dump(x, f) #
serializes and writes to file f
x = json.load(f) #
reads it back
import json
json.dump(obj, fp, ensure_ascii=True, # fp is a file pointer
indent=None, # None gives compact, 4 gives a prettier output
separators=None, #(item_separator, key_separator), such as (',', ':')
)
json.dumps(obj, ensure_ascii=True) # dumps to string
print(json.dumps(obj))
a_dict=json.load(fp,
parse_float=None,
parse_int=None,
parse_constant=None,
)
a_dict=json.loads(a_string)
json to python conversion (json --> python)
Dates as "YYYY-MM-DDTHH:MM:SS.sss"
If I get the error 'Object of type datetime is not JSON serializable, then
provide a default function for serializing: json.dumps(the_object, default=str)
.
See also Flask
Doc: https://2.python-requests.org//en/latest/user/quickstart/
#pip install requests
import requests
import json
response = requests.get("http://api.open-notify.org/astros.json")
print("Status=",response.status_code)
print("Response=",response.json())
# the response has three parts: message, request, response
import json
json.dumps(obj, sort_keys=True, indent=4)
See also zeep
Import:
import nltk
A popup appears for managing the downloads
Stopwords:
from nltk.corpus import stopwords
stopwords.words("english")
Doc:
import pandas as pd
import pyarrow # this is a parquet engine Alternative: fastparquet
pd.read_parquet(parquet_file, engine='auto')
See also requests
Shell Script:
#!/bin/bash
export FLASK_APP=min_flask
python3 -m flask run
File min_flask.py
from flask import Flask, redirect, url_for
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Namou</p>"
# export FLASK_APP=min_flask
# python3 -m flask run
@app.route("/h/")
def help_r():
return """<p>/h help<br />
/hello/a-name<br />
/param/help --> see /h<br />
/param/helloxyz -->> see hello<br />
/param/something<br />
/id/an-integer</p>"""
# end with / to be canonical
@app.route("/hello/<the_name>/")
def show_name(the_name): # parmameter has to be what is beween < and >
if type(the_name) is str:
return "<h1>Hello "+the_name+"! str</h1>"
else:
return "<h1>Hello "+the_name+"!xnot a strx</h1>"
@app.route("/id/<int:the_id>/")
def get_id(the_id):
return "<h1>The ID = "+str(the_id)+"</h1>"
@app.route("/param/<the_param>/")
def get_param(the_param):
if the_param=="help":
return redirect(url_for('help_r'))
elif the_param.startswith("hello"):
return redirect(url_for('show_name', the_name=the_param[5:]))
else:
return "<p>Unknown parameter: '"+the_param+"'</p>"
# needs: from flask import redirect, url_for
Another example:
pipenv shell
pipenv install flask
file:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/')
def test():
return "Hello World"
@app.route('/pt', methods=['GET'])
# http://127.0.0.1:8080/p?pm=a-value
def test_param():
param = request.args.get("pm")
return jsonify({"found": param})
if __name__ == '__main__':
app.run(port=8080)
https://flask.palletsprojects.com/en/2.0.x/tutorial/layout/
/yourapplication
/yourapplication
__init__.py
/static
style.css
/templates
layout.html
index.html
login.html
...
pipenv install jupyter
Type: jupyter notebook
In the upper right, click on "new".
Shortcuts:
Clear output: Cell > all output > clear
Markdown: tables
|Header|Cells|
|---|---|
|Body of|the table|
Installation:
pip3 install networkx
pip3 install pyvis # For visualization
import networkx as nx # skip if just visualizing
from pyvis.network import Network
G = nx.Graph()
# G = nx.DiGraph() # directed graph
G.add_nodes_from([
(4, {"color": "red", "label": "4"}),
(5, {"color": "green", "label": "5"}),
(6, {"color": "purple", "label": "6"}),
(7, {"intensity": "strong", "label": "7"}),
])
# An edge with a node not entered earlier is added automatically (here 1)
G.add_edges_from([
(1,4,{'weight': 3, 'label': "1-4", "color": "yellow"}),
(4,5),
(6,4,{'weight': 2}),
(7,4,{'hidden': True})
])
G.add_edges_from([(5,5)])
G.remove_node(103) # removes its edges too
# add labels to all nodes
for n in G.nodes:
G.nodes[n]['label'] = "n" + str(n)
G.number_of_nodes()
G.number_of_edges()
G.nodes
G.edges
for n in G.nodes:
print("node: ",n)
print("adj:",G.adj[n])
print("deg: ",G.degree[n])
print(G)
g = Network(directed=True) # directed=True to see the arrows
g = Network(height="1500px", width="100%", bgcolor="#222222", font_color="white", directed=True)
g.from_nx(G) # if the graph was built with networkx
g.width="75%" # best for viewing with buttons, otherwise 100%
g.show_buttons(filter_=['physics']) # remember the underscore
g.set_edge_smooth("dynamic") # show multiple arrows
# select one of the following:
g.barnes_hut() # the default
g.force_atlas_2based()
g.hrepulsion()
g.show("give_a_file_name.html")
g.add_node("...", color="#...", shape="box")
g.add_edge("from...","to...", color="#...")
Node shapes:
Text has no shape, just the text
https://networkx.org/documentation/stable/tutorial.html
graph types:
Graph, DiGraph, MultiGraph, and MultiDiGraph
For edges, if there is only one numeric attribute, then use the 'weight' keyword for the attribute
list(nx.connected_components(G))
nx.clustering(G)
sp = dict(nx.all_pairs_shortest_path(G))
for e in list(G.edges)
dag
list(nx.topological_sort(graph)) # => ['root', 'a', 'b', 'd', 'e', 'c']
nx.is_directed_acyclic_graph(graph) # => True
https://networkx.org/documentation/stable/reference/algorithms/dag.html#
from matplotlib import pyplot as plt
g1 = nx.DiGraph()
g1.add_edges_from([("root", "a"), ("a", "b"), ("a", "e"), ("b", "c"), ("b", "d"), ("d", "e")])
plt.tight_layout()
nx.draw_networkx(g1, arrows=True)
plt.savefig("g1.png", format="PNG")
# tell matplotlib you're done with the plot: https://stackoverflow.com/questions/741877/how-do-i-tell-matplotlib-that-i-am-done-with-a-plot
plt.clf()
g2 = nx.DiGraph()
g2.add_edges_from([(1, 2), (2, 3), (3, 4), (4, 1)])
plt.tight_layout()
nx.draw_networkx(g2, arrows=True)
plt.savefig("g2.png", format="PNG")
plt.clf()
try this
https://towardsdatascience.com/graph-visualisation-basics-with-python-part-ii-directed-graph-with-networkx-5c1cd5564daa
https://networkx.org/documentation/stable/auto_examples/drawing/plot_directed.html
Some notes about graphs:
See https://en.wikipedia.org/wiki/Flowchart
Some key words: IDEF1X, Data flow, Yourdon, DeMarco
import graph_tool.all
https://graph-tool.skewed.de/
Basic outputs to console:
import logging
logging.basicConfig(level=logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, or CRITICAL
logging.info('something') # notice lower case here, and upper case above when selecting the level
levels, in decreasing level of verbosity:
DEBUG, INFO, WARNING, ERROR, CRITICAL
Each level displays messages of its level and those to the right
Formatted output to file
import logging
logging.basicConfig( filename=__name__ + ".log"
, level=logging.DEBUG
, format="%(asctime)s:%(levelname)s:%(name)s:%(filename)s:%(module)s:%(funcName)s:%(lineno)s:%(msg)s"
)
%(asctime)s
date and time
%(filename)s
Python script file name (lower case "n")
%(module)s
name of module (often filename without the ".py")
%(funcName)s
name of function (upper case "N"), "
%(levelname)s
shows DEBUG, INFOR, . . (lower case "N")
%(msg)s
See more at https://docs.python.org/3/library/logging.html
Only one basicConfiguration is allowed per logger.
The default logger is the root logger.
To add loggers, do this:
import logging #
in every module
log_obj = logging.getLogger(__name__)
# __name__ by convention, can be anything
log_obj.setLevel(logging.DEBUG) #
I can set the level on the object (shown here) or on the handler (see below)
file_handler = logging.FileHandler("a file name")
formatter = logging.Formatter("%(asctime)s:%(name)s. . . .")
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG) #
I can set the level on the object (see above), or on the handler (shown here)
log_obj.addHandler(file_handler)
# then call the logging with the object, not logging
:
log_obj.debug("the msg")
Add another handler:
stream_handler = logging.StreamHandler()
streaming_formatter = logging.Formatter("%(asctime)s:%(name)s. . . .")
stream_handler.setFormatter(streaming_formatter) #
I can set a different formatter if I need
stream_handler.setLevel(logging.DEBUG) #
I can set a different level, other than the logger level
logger.addHandler(stream_handler)
Question: if I do not set anything on the logger objects, do I get the default logger level configuration?
To show the traceback in case of an error, do:
logging.exception("the msg")
instead of
logging.error("the msg")
Or use the log_obj
The following two are equivalent:
logging.exception("") #
better because I can put a message
and
logging.error(traceback.format_exc())
This results in an error:
logging.error("Error:", str(e))
logging.exception("Error:", str(e))
Suppress excessive debug messages in other packages:
logging.getLogger("module").setLevel(logging.INFO)
The "module"
can be "package.sub.module"
https://docs.python.org/3/library/logging.html
See snippet: python_snippets.html#logging_and_config
linter: pylint
first-line begins with ---
End of the document with ...
each line of text contains key and value pairs like a map.
key and values are separated by a colon (:) and space
use spaces instead of the tab for indentation
# Comments , can be in middle of line
# list, with list (brackets) and with bullets. Both are equivalent
key1:
- value1
- value2
- value3
- value4
- value5
or
key1: [value1,value2,
value3,value4,value5]
# indent for nesting the arrays
# associative array (here list of two associative arrays with id and name)
- id: 234
name: abc
- id: 567
name: fgh
or
[{id: 234, name: abc}, [id, name]: [567, fgh}]
Strings do not need quotes, but I guess it is better with quotes
With double quotes, use \ backslash to escape
With single quotes, the only escape is double single quotes
In multiline strings, | preserves the newlines, > folds the newlines
A question mark can be used in front of a key, in the form "?key: value" to allow the key to contain leading dashes, square brackets, etc., without quotes.
Separate documents in the same stream with ---
(triple dash). Optionally end a document with triple period
&anchor01 #
define anchor label "anchor01"
*anchor01 #
references the anchor01. It allows re-use of the data
Explicitely define a data type:
Key: !!str a string
Key2: !!float
Options: !!float, !!str, !!binary
parsers:
http://yaml-online-parser.appspot.com/
http://www.yamllint.com/
Good intro:
https://en.wikipedia.org/wiki/YAML
Official spec: https://yaml.org/spec/1.2.2/
import logging
import logging.config
import yaml
def main():
"""
entry point to run the job.
"""
# Parsing YAML file
config = '...config.yml'
config = yaml.safe_load(open(config))
# configure logging
log_config = config['logging']
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
logger.info("This is a test.")
if __name__ == '__main__':
main()
yaml config file (needs more work):
# Logging configuration
logging:
version: 1
formatters:
the_app:
format: "The Name- %(asctime)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: the_app
level: DEBUG
root:
level: DEBUG
handlers: [ console ]
pytest
has more features than the unittest
package. The unittest
package comes with python.
Documentation in https://docs.pytest.org/en/stable/
A test has four steps:
pipenv shell
pipenv install pytest
pytest . # run the tests
pytest code/asdf.py -v # increase verbosity
pytest code/asdf.py -s # show output of prints
pytest -k "something" # k is keyword flag. Executes only test functions with name "test_.....something...."
pytest -m a_marker # Run only the tests with marker a_marker
pytest -m "not a_marker" # Exclude tests with marker a_marker
pytest -v # Verbose output: show more information
pytest -s # Show printed output (the result of 'print(...)' statements
pytest --durations=0 # Track the time of execution
import pytest
@pytest.mark.skip(reason="optional reason") # marker for skipping a test:
def test_should_be_skipped() -> None:
assert 1==2
@pytest.mark.skipif(3>1, reason="...")
def test_should_be_skipped() -> None:
assert 1==2
@pytest.mark.xfail
def shows_as_xfail_and_not_fail() -> None:
# shows xpass or xfail in the results
@pytest.mark.any_marker
def ...
# then call with
# pytest . -m slow
# only the marked functions are tested
@pytest.mark.django_db
def ...
# Makes the following function spin up a database and a transaction will be created just for that test, then rolled back when the test is completed
Fixtures are basically objects that appear in the code without me defining them.
Use for setup and teardown
The use of fixtures explicitely declares the dependencies (makes code more maintainable).
A fixture can use another fixture.
A fixture can have a function scope, class scope (i.e. run once per classe), module (run once per module), session (one per session)
Fixture Without Arguments
@pytest.fixture
def the_fixture():
# do something
return ...
def test_that_uses_fixture(params, the_fixture, params): # the fixture is one of the parameters, without parentheses
# pytest looks at all fixtures and sees one called "the_fixture"
# it runs "the_fixture" and puts the result in the argument
print(f"Printing {the_fixture} from fixture")
Fixture With Arguments
@pytest.fixture
def the_fixture():
def _fixture_name(arg1, arg2):
...
return ...
def _fixture # Here is the magic: return the function. No ()
def fctn_that_uses_fixture(params, the_fixture, params): # the fixture is one of the parameters, without parentheses
# here, the_fixture is a function that takes an argument
a = the_fixture(a1, a2)
You may want to put all fixtures in one file.
Place in the directory most appropriate for the scope of the fixtures.
Allows running a test multiple times with differing data
@pytest.mark.parametrize(
"the_input_param",
["A...", "B...", "C..."],
)
def test_paramtrzed(the_input_param: str) -> None:
print(f"\ntest with {the_input_param}")
# run multiple times with the different values
Indirect:
Look at the documentation.
Basically, when indirect=True, the parameters are passed to the "request" object from where they are extracted.
The parameters go to the fixture. Inside the fixture, the request object holds the parameters. Extract the parameters from the "request" object. The fixture object then returns the required data.
def fctn_that_raises_an_exception() -> None:
raise ValueError("an excep")
def test_raise_an_exception_should_pass() -> None:
with pytest.raises(ValueError):
fctn_that_raises_an_exception()
def test_raise_an_exception_should_pass_and_test_message() -> None:
with pytest.raises(ValueError) as e:
fctn_that_raises_an_exception()
# this tests the text of the message
assert "an excep" == str(e.value)
# Note that we need "str()" of the e.value\
Test the test by doing a "pass" instead of "raise" in the called function fctn_that_raises_an_exception()
Add options in the pytest.ini file instead of typing pytest -v
each time:
[pytest]
addopts = -v -s
addopts = -v -s --durations=0
Markers have to be registered in pytest.ini
. The pytest.ini
file is at the root of the project.
[pytest]
markers =
this_is_a_marker: and this is the comment. Use this in test...py file: @pytest.mark.this_is_a_marker
Initialize the logger (as always)
Note to pass something called "caplog" as the parameter to the test function.
In the text function, simply assert that a given string is in the caplog.text
Note that only warning or error or ciritical logs can be tested
See sample code for testing "info" level. Put "with caplog.at_level(logging.INFO):" and do the testing inside the section (this is a context manager)
Allows multiple threads
pipenv install python-xdist
pytest -n NUMCPUS
pytest -n auto # uses max number of CPUs
Pip install allure-pytest
pytest -alluredir=/tmp/test_rslts
allure serve /tmp/test_rslts
pip install pytest-sugar
Gives a nicer display of test results
Goal: concentrate on one set of functions without getting exceptions from other functions that are out of scope for my develoment and testing.
This means we do not send requests to the third party, but we build responses that look like (mock) the real responses. However, remember to also test that the mocked responses are the same as the real responses.
Replaces an object with another. If there is a function I do not want to run, "I can patch it out".
The patch function mocks the functionality of a given function. The parameter is the function that we want to mock Mocking is creating an object with something that has the same behavior, but with simpler dependencies
from unittest.mock import Mock, MagicMock
a_mock_obj = MagicMock()
object_that_returns_a_value = MagicMock(return_value="my value")
object_that_raises_an_error = MagicMock(side_effect=ValueError("aaa"))
file mma.py
def f2mock():
print("in f2mock")
return 1234
file mmm.py
import mma
def f():
print("f start")
x = mma.f2mock()
return x
from unittest.mock import patch, MagicMock
import module_that_has_f_and_db_write as mmm
# patching: first param is a string with target, second param is object to use instead
# patch out "mma.f2mock" because that is what shows in the code
# patch where the function is used, not where it is defined
@patch("mma.f2mock", MagicMock(return_value=13))
def test_mock_out_db_write():
assert mmm.f() == 13
# alternate: use a context manager:
def test_mock_out_db_write():
assert mmm.f() == 1234
with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri:
assert mmm.f() == 13
Option #1 for designating the function to be mocked:
file mmm.py
import mma
def f():
return mma.f2mock()
file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm
def test_mock_out_f2mock():
assert mmm.f() == 1234
with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri:
assert mmm.f() == 13
Option #2 for designating the function to be mocked:
file mmm.py
import mma as abc # as abc here
def f():
return abc.f2mock() # call abc.f2mock() not mma.f2mock()
file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm
def test_mock_out_f2mock():
assert mmm.f() == 1234
with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri: # notice mma here, and not abc
assert mmm.f() == 13
Option #3 for designating the function to be mocked:
file mmm.py
from mma import f2mock as ddd # import just the function, and rename
def f():
return ddd() # calling with just ddd()
file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm
def test_mock_out_f2mock():
assert mmm.f() == 1234
with patch("mmm.ddd", MagicMock(return_value=13)) as mock_dbwri: # notice mmm.ddd here
assert mmm.f() == 13
pipenv install responses
@responses.activate
def test_where_i_want_to_mock():
responses.add(method=responses.GET, url=anyurl, json={what I want to simulate}, status=200) # can be any status
rsp = requests.get(anyurl) # note: same url as above
assert rsp.json() == {what I want to simulate}
See https://www.djangoproject.com/
pipenv install djangorestframework
django-admin startproject the-name # this creates the service
cd the-name
python manage.py runserver # .../first-sub-dir-where-manage.py-is-located
python manage.py migrate
python manage.py createsuperuser
#go to http://127.0.0.1:8000/admin
python manage.py startapp application-name # create a new application inside the service
python manage.py makemigrations application-name # then migrate after each change
You may have to add:
export PYTHONPATH=/..../root_directory_of_project
In models.py
file: create class, inherit from models.Model. Add attributes
In the admin.py
file, register the application.
Create a serializer, and inherit from models serializer.
Create view sets in the views.py file.
Put URLs in urls.py
in the app directory. This file maps the routes (urls) to the functions
from rest_framework import routers
from .views import CompanyViewSet
app_router = routers.DefaultRouter()
app_router.register("app_prefix", viewset=CompanyViewSet, basename="companies") # CompanyViewset is defined in the views.py file
In the urls.py for the whole server, add:
from api.service_name.app_name.urls import app_router
add this line:
path("", include(app_router.urls))
Send email automatically when posting to ..../send_email
For testing, import the Django TestCase class from django.test.
Use the "in-memory backend"
Test by looking at the outbox. Asset that it has nothing before starting, and 1 email after starting.
We do NOT want it to fail silently
https://pypi.org/project/Faker/
from faker import Faker
fake = Faker() # default "en_US"
# other options: 'it_IT', 'fr_FR', ['en_US', 'ja_JP']
fake.name()
fake.address()
fake.text()
Also, command line:
faker --version
import itertools
(standard library)
Something is iterable if it has a method called "__iter__"
.
This allows the interator to remember its state.
Take any iterable object (list, tuple, . . .) and get its interator:
it = obj.__iter__()
it = iter(obj) #
alternative syntax that does the same thing
Then, do next(it)
as needed to get the successive values, or of course do for a in it:
print(next(a_counter))
print(next(a_counter))
print(next(a_counter))
Note: there is no end on a counter. Going past the end throws an error
Iterators can only go forward.
a_counter = itertools.counter() #
optional parameters start
and step
(decimal possible)
c = itertools.cycle(a_iterator) #
cycle thru forever
c = itertools.repeat(a_value) #
repeat with an optional parameter times=
itertools.combinations(lst, n) #
(order does not matter)
itertools.permutations(lst, n) #
(order matters: (1,2) and (2,1) both listed)
itertools.combintations_with_replacements(lst, n) #
combinations that allow repeats
itertools.product(lst, repeat=n) #
cartesian product
itertools.product([0, 1], repeat=4) #
all possible values of 4 bits
Get part of a generator, and make a new generator (rememeber, nothing is yet calculated):
itertools.islice(gn, 7) #
The first 7
itertools.islice(gn, 2, 7) #
Skips 2, then returns the next 7-2=5
itertools.islice(gn, 2, 7, 2) #
step by 2, starting with the first
Note that these three arguments are the same as for range().
import smtplib
(standard library)
I have not tried this yet.
with smtplib.SMTP('server.com', 587) as ms:
ms.ehlo()
ms.starttls()
ms.ehlo()
ms.login(u, pw)
msg = "subject: . . .\n\n. . . ."
smtp.sendmail(my_email, rcver_email, msg)
Run a local mail server. Note that it has no tls and no login:
python -m smtpd -c DebuggingServer -n localhost:a_port_num
Alternative:
with smtplib.SMTP_SSL('server.com', 465) as ms:
ms.login(u, pw)
msg = "subject: . . .\n\n. . . ."
smtp.sendmail(my_email, rcver_email, msg)
Make handling of message part easier:
from email.message import EmailMessage
msg = EmailMessage()
msg["Subject"] = ". . ."
msg["From"] = ". . ."
msg["To"] = ". . ." # for multiple, do a list, or a string with comma separated emails
msg.set_content(". . .")
smtp.send_message(msg)
smtp.add_alternative(html_str, subtype="html")
Attach file
with open("file name", "rb") as f:
d = f.read()
img_type = "jpeg" # or use package imghdr: imghdr.what("file name")
fn = f.name
msg.add_aytachment(d, maintype="image", subtype="jpeg", filename=fn) # or result of the "what()"
Generic: maintype="application", subtype="octet-stream"
See also requests
http://www.python-course.eu/python3_deep_copy.php
continue with http://www.python-course.eu/python3_recursive_functions.php
http://www.sthurlow.com/python/lesson01/
http://wiki.python.org/moin/BeginnersGuide/Programmers
Brent Welch's "Practical Programming in Tcl and Tk"
Documentation:
When opening files, use a context manager. This forces the file to close if the code throws an error.
Instead of:
f = open(...)
f.close()
Do this:
with open(...) as f:
With a bare "except" clause, a ctrl-C triggers an exception, yet I want to stop the execution.
Instead, do "except ValueError:" or whatever error type. Otherwise, do "except Exception e:"
It is better to not handle the exception than to do a "pass". Remove useless exception clauses.
Instead of checking for a type this way:
If type(a) == a_type:
Do
If isinstance(a, a_type):
is
keyword:If x is None:
If x is True: or if x
If x is False: or if not x
Do NOT do:
x == None
x == True
x == False
Note: the result has the length of the shortest input. zip([1,2],[])
gives []
a = ["a", "b", "c", "d"]
b = [1,2,3]
c = ["α", "β", "γ"]
z = zip (a,b,c)
[zz for zz in z]
# returns: [('a', 1, 'α'), ('b', 2, 'β'), ('c', 3, 'γ')]
To loop over keys of a dictionary, do for k in d
not for k in d.keys()
as .keys()
is not necessary here.
Note: if I am to modify the keys, then iterate over acopy: for k in list(d)
Instead of
a = mytuple[0]
b = mytuple[1]
do
a, b = mytuple
If I do not know the full length of the tuple:
a variable preceded by an asterisk takes on the list of all remaining values, and is empty if there are none.
a, b, *c, d = (1,2,3,4,5,6)
Or optionally if I want to not use the values (good practice, not required):
a, b, *_ = (1,2,3,4)
For inputing password on the screen:
import getpass
u = input("username: ")
p = getpass.getpass("password: ")
Instead of for i in range(len(lst))
do
lst = [3,4,5]
for i,a in enumerate(lst, start=1):
or, when there are two lists:
a = [3,4,5]
b = [5,6,7]
for aa,bb in zip(a,b):
When counting in the two lists:
a = [3,4,5]
b = [5,6,7]
for i, (aa,bb) in enumerate(zip(a,b)):
Note, enumerate
starts at 0 by default; add start=...
if needed.
See also "enumerate".
Instead of
i=0
for a in lst:
...
i += 1
Do
for i,a in enumerate(lst):
...
At the end, i will contain the number of loops that were done
Of course, this does not work if I have a conditional count
Pep8 is a style guide. See https://peps.python.org/pep-0008/
Never pass a mutable object as a default value for parameter. Instead of
def asdf(lst=[]):
# lst is an empty list at first call, but this picks up the previous value in the subsequent calls
Do this:
def asdf(lst=None):
# instead, set "lst=None")
This is because Python evaluates the inputs when creating the function. Set the default to None
instead.
Watch this unexpected result: (sneaky behavior to be aware of):
import time
import datetime
def show_curr_time(t=datetime.datetime.now()): # default value is the the current time when the function is created!
print(t)
show_curr_time()
time.sleep(1)
show_curr_time() # same value as above
The last element of a list or string is at the position len(...)-1
.
The following code will throw an error: a_string[len(a_string)]
.
Instead, do this a_string[len(a_string)-1]
, or better still a_string[-1]
.
Related to this, the following will throw an error:
the_string[the_pos] if len(the_string) >= the_pos else ''
when the_pos == len(the_string)
.
Use the following code instead (notice ">"
not ">="
):
the_string[the_pos] if len(the_string) > the_pos else ''
Display large numbers with separators "_" (underscore):
x = 32_439_318
Print as follows (but it only takes a comma): print(f"{x:,}")
https://docs.python.org/2/howto/webservers.html
http://fragments.turtlemeat.com/pythonwebserver.php
http://www.linuxjournal.com/content/tech-tip-really-simple-http-server-python
https://wiki.python.org/moin/WebProgramming
http://docs.python-guide.org/en/latest/scenarios/web/
mod_wsgi (Apache) (Embedding Python)
modwsgi allows you to run Python WSGI applications on Apache HTTP Server.
https://pypi.python.org/pypi/mod_wsgi
http://modpython.org/
http://www.onlamp.com/pub/a/python/2003/10/02/mod_python.html
See details on package installation: https://packaging.python.org/tutorials/installing-packages/
The Classical Language Toolkit https://github.com/cltk/cltk
natural language pipeline that supports massive multilingual applications https://pypi.python.org/pypi/polyglot/
Text fabric, includes graph-like approach
https://pypi.python.org/pypi/text-fabric/
See also (collection of richly annotated data sources, including HB and GNT)
https://github.com/ETCBC/text-fabric-data
Library for working with neo4j and graphs https://github.com/bruth/graphlib/
Another library https://github.com/svasilev94/GraphLibrary
Graph visualization https://pypi.python.org/pypi/graphistry/0.9.51
High performance graph data structures and algorithms
https://pypi.python.org/pypi/python-igraph/0.7.1.post6
See also
http://igraph.org/python/doc/tutorial/tutorial.html
Graphyne is a smart graph - a property graph capable to actively reacting to changes and incorporating decision making logic, written in Python https://github.com/davidhstocker/Graphyne