Python SnippetsThese are my personal notes that I use as a quick help in my work.
|
|
#!/usr/bin/python
import sys
print('hello World!')
print(sys.version)
import pyodbc
import sys
import os
import ct_write_html
import ct_cn
input_param1 = sys.argv[1]
input_param2 = sys.argv[2]
fo = open(fo_name, "w")
def display_results_3(fo, cr_sqlsA, cr_sqlsB, cr_sqlsC, the_env, pw, sql_sqlS, sql_ora, param_list_sqlS, param_list_ora, title, num_cols_in_key, num_cols_to_compare, num_cols_to_add):
res_sqlSA = ct_cn.run_query(cr_sqlsA, sql_sqlS, param_list_sqlS)
res_sqlSB = ct_cn.run_query(cr_sqlsB, sql_sqlS, param_list_sqlS)
if the_env=="PROD":
res_sqlSC = ct_cn.run_query(cr_sqlsC, sql_sqlS, param_list_sqlS)
else:
res_sqlSC = ([],[])
# connect, query, and close, to prevent ORA timeout
cnora = ct_cn.ora(the_env, pw)
cr_ora = cnora.cursor()
res_ora = ct_cn.run_query(cr_ora, sql_ora, param_list_ora)
ct_write_html.display_results_of_two_sets_3instances(fo, res_sqlSA, res_sqlSB, res_sqlSC, res_ora, "eCC", "KC", "row count")
res3 = ct_cn.put_three_in_one(res_sqlSA, res_sqlSB, res_sqlSC, num_cols_in_key, num_cols_to_add)
ct_write_html.display_differences(fo, res3, res_ora, "eCC", "KC", num_cols_in_key, num_cols_to_compare, False)
cnora.close()
empty_list = []
cnsqlsA.close()
cnsqlsB.close()
fo.close()
os.startfile(fo_name)
print ("Completed")
import pyodbc
def ora(env, pw):
print ("Creating connection for env=" + env + " in ct_cn.ora")
#print ("pw="+pw)
if env=='DV2':
return pyodbc.connect("DSN=DATWDV2;PWD=" + pw)
elif env=='QA':
return pyodbc.connect("DSN=DATWQA;PWD=" + pw)
elif env=='PS':
return pyodbc.connect("DSN=DATWPS;PWD=" + pw)
elif env=='PROD':
return pyodbc.connect("DSN=DATW;PWD=" + pw)
else:
print ("env=" + env + " not recognized in ct_cn.ora")
def run_query(cr, sql_cmd, sql_param):
# The results have two elements: the description of the columns and the list of rows
cr.execute(sql_cmd, sql_param)
# get the column descriptions
col_desc = [d[0] for d in cr.description]
#print ("Columns: ", col_desc)
# Get the results
results = []
for one_row in cr:
results.append(one_row)
# put the column desciptions and the results together
result = (col_desc, results)
print (results)
# return
return result
def print_results(col_2dimres):
# this assumes a tuple with col desc in 1st element and results in 2nd
# see above
print("columns: ", col_2dimres[0])
for i, rw in enumerate(col_2dimres[1]):
print("row", i+1, ": ", rw)
def make_result_dict(r, num_key_cols):
# The output is dictionary
# the keys are tuples created by the values in the columns 0 .. num_key_cols-1
# the values are the rows from the query
# The input assumes a tuple (col desc, results) as created above
# So, this means that the rows in the second element of the tuple are values
# in the resulting dictionary.
# The column descriptions are not put into the dictionary
the_dict = dict()
#print ("r 1 " ,r)
for rw in r[1]:
#print ("rw=",rw)
the_dict[rw[0:num_key_cols]] = rw
return the_dict
import csv
def write_results(filename, col_2dimres, src, delim_char=',', quote_char='"'):
# this assumes a tuple with col desc in 1st element and results in 2nd
# see ct_cn
with open(filename, 'w', newline='') as fo:
lines_out = csv.writer(fo, delimiter=delim_char, quotechar=quote_char, quoting=csv.QUOTE_MINIMAL)
fo.writerow(col_2dimres[0])
for rw in col_2dimres[1]:
lines_out.writerow(rw)
close(fo) # is this needed?
import numpy as np
import matplotlib
backend = 'Agg'
matplotlib.use(backend)
import matplotlib.pyplot as plt
x = np.linspace(0, 1)
y = np.sin(4 * np.pi * x) * np.exp(-5 * x)
plt.fill(x, y, 'r')
plt.grid(True)
#plt.show()
plt.savefig('glu')
import numpy as np
import matplotlib.pyplot as plt
x=np.arange(0.0, 2.1, 0.01) # upper end of range: open
plt.plot(x, np.sin(np.pi * x))
plt.plot(x, np.cos(np.pi * x))
plt.plot(x, -1* np.sin(np.pi * x))
plt.plot(x, -1* np.cos(np.pi * x))
plt.axis([0,2,-1.1,1.1])
plt.show()
plot.axhline(305, linewidth=3, color='r')
import datetime as dt
import matplotlib.dates as mdates
dates = mdates.drange(dt.datetime(2010, 01, 01), dt.datetime(2012,11,01), dt.timedelta(weeks=3))
plt.xlabel('...', fontsize=14, color='red')
plt.ylabel('...')
plt.title('...')
plt.text(x, y, r'$text$') #r indicates raw text
plt.axis([40, 160, 0, 0.03])
plt.grid(True)
plt.annotate('...', xy=(2, 1), xytext=(3, 1.5),
arrowprops=dict(facecolor='black', shrink=0.05),
)
def readln(f):
try:
li=f.readline()
except UnicodeDecodeError:
li=str("")
return li
# read file and count the row content
# creates a dictionary of all line values and counts the frequency
import os
fo2=open("list_of_error_counts.txt",'w')
f="C:\\Users\\Z0242594\\Documents\\ct_files\\sql\\datw\\proc_adm_20151216_load_all_errors.txt"
print "Reading:", f, type(f)
ff = open(f,'r')
li = ff.readline()
distinct_line_count={}
while len(li)>0:
line_count+=1
if repr(li) in distinct_line_count:
distinct_line_count[repr(li)]+=1
else:
distinct_line_count[repr(li)]=1
li = ff.readline()
ff.close()
for d in distinct_line_count:
fo2.write(d + ": " + str(distinct_line_count[d]) + '\n')
fo2.close()
# Reads a file and looks at the lines
# counts rows and ignores empty lines and lines with "1 row created."
# outputs all other rows to a file
import os
max_col_len=[]
for i in range(18):
max_col_len.append(0)
print max_col_len
fo=open("list_of_errors.txt",'w')
f="C:\\Users\\Z0242594\\Documents\\ct_files\\sql\\datw\\proc_adm_20151216_load_all_errors.txt"
print "Reading:", f, type(f)
ff = open(f,'r')
li = ff.readline()
line_count=0
line_ct_ok=0
line_ct_err=0
while len(li)>0:
if li=="\n":
pass
elif li[0:14] == "1 row created.":
line_ct_ok+=1
else:
fo.write(li)
line_ct_err+=1
li = ff.readline()
ff.close()
print f," has ",line_count, " lines"
print f," has ",line_ct_ok, " rows loaded"
print f," has ",line_ct_err, " errors"
fo.close()
# Loops through all files in a folder and treats "*.txt" and *.sql"
import os
for filename in os.listdir(src_dir):
if filename.endswith((".txt", ".sql")):
full_file_name = src_dir + "\\" + filename
import os
fo=open("proc_adm_20151216_insert_stmts.sql",'w')
fo.write('\n')
fo.write('set define off\n') # turn off define
fo.write('\n')
for f in os.listdir(os.getcwd()):
if f[-4:len(f)]==".txt":
print "Reading:", f, type(f), " sep=",date_sep
skip_first=True
ff = open(f,'r')
li = ff.readline()
line_count=0
while len(li)>0:
line_count+=1
if skip_first:
skip_first=False
else:
liel = li.split('|')
# now make string
stmt = "insert into diagnosis_corr_20151216 (MRN, Clinic, OldStatus, HistoryRecor)"
stmt+= " values ("
stmt+= "to_date('" + liel[7] + "', 'MM/DD/YYYY HH:MI:SS AM'), " # date
stmt+= "'" + liel[9].replace("'", "''") + "', " # handle single quotes
stmt+= "'" + f + "' " # add the file name too
stmt+= ");"
fo.write(stmt + '\n')
li = ff.readline()
ff.close()
fo.write('\n')
fo.write('commit;\n')
fo.write('\n')
fo.close()
import sqlite3
db_name = "test_sql2cvs.db"
conn = sqlite3.connect(db_name)
cr = conn.cursor()
p = (3,6,)
cr.execute('select * from a where i in (?,?) ', p);
print([col_desc[0] for col_desc in cr.description]) # column descriptions
for one_row in cr:
print(one_row)
cr.execute(sql_cmd)
cn.commit()
cn.close()
import sqlite3
import csv
import time
#https://docs.python.org/3.4/library/csv.html
db_name = "test_sql2cvs.db"
conn = sqlite3.connect(db_name)
cr = conn.cursor()
fn_out = "test_sql2cvs.csv"
print("reading database " + db_name + " and inserting into " + fn_out)
cr.execute('select * from a');
print (cr.description)
print ([col_desc[0] for col_desc in cr.description]) # get the first in each description
with open(fn_out, 'w', newline='') as fo:
lines_out = csv.writer(fo, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
lines_out.writerow([col_desc[0] for col_desc in cr.description])
for one_row in cr:
lines_out.writerow(one_row)
print(one_row)
cr.execute('select * from a where i in (?,?) ', p);
for one_row in cr:
print(one_row)
# # Never do this -- insecure!
# symbol = 'glu'
# c.execute("select * from stocks where symbol = '%s'" % symbol)
# Do this instead
t = ('glu',)
c.execute('select * from stocks where symbol=?', t)
# Larger example
for t in [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
('2006-04-06', 'SELL', 'IBM', 500, 53.00),
]:
c.execute('insert into stocks values (?,?,?,?,?)', t)
import json, sqlite3, sys
import pandas as pd
sys.path.append("..\\ingit\\DG-py-collibra-rest-api\\core")
import utilities
import clb_functions
if len(sys.argv)<3:
print("Two arguments are required: the file of posts and the file of votes. Include the path")
sys.exit(1)
else:
posts_file_in=sys.argv[1]
votes_file_in=sys.argv[2]
if db_conn:
....
#close database connection at end
db_conn.close()
else:
print("Could not open the database '" + db_name + "'")
print("...done")
SAFE_CHAR_FOR_FILENAME="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-_"
def safe_string(s):
if s:
return ''.join([c for c in s if c in SAFE_CHAR_FOR_FILENAME])
else:
return ""
file_name=safe_string(...)
# split the dataframe into smaller pieces. The result is a "group" of dataframes
for g in all_data.groupby(['env', 'domain_name', 'ast_type']):
# Index is needed for the unstack
r = g[1].set_index(['env', 'domain_id', 'domain_name', 'ast_type', 'ast_full_name', 'ast_display_name', 'ast_uuid', 'ast_descr', 'ast_rec_ty', 'char_type'])
# g[0] has the values of the index, g[1] has the smaller dataframes
write_to_file(r.unstack(level=-1), None, output_filebasename, "_" + safe_string("_".join(g[0])))
import time
curr_dttm = time.strftime("%Y%m%d") + "_" + time.strftime("%H%M%S")
output_filebasename += "_" + curr_dttm
the_sql = """select ast.env """
just_assets = pd.read_sql_query(the_sql , db_conn, params=the_params)
write_to_file(just_assets, the_sql, output_filebasename, "_raw_ast")
print("Loaded list of all assets")
the_sql = """select ast.env ..."""
attributes = pd.read_sql_query(the_sql , db_conn, params=the_params)
write_to_file(attributes, the_sql, output_filebasename, "_raw_att")
print("Number of attributes:",str(len(attributes)))
the_sql = """select ast.env... """
relations_to = pd.read_sql_query(the_sql , db_conn, params=the_params)
print("Number of relations to:",str(len(relations_to)))
# join all, vertically
all_data = just_assets.append(attributes, ignore_index=True, verify_integrity=False, sort=False).append(relations_to, ignore_index=True, verify_integrity=False, sort=False).append(relations_from, ignore_index=True, verify_integrity=False, sort=False)
#print(str(len(attributes))+'+'+str(len(relations_from))+'+'+str(len(relations_to))+'='+str(len(all_data)),str(len(attributes)+len(relations_from)+len(relations_to)==len(all_data)))
write_to_file(all_data.set_index(['env', 'domain_name', 'domain_id', 'ast_type', 'ast_full_name', 'ast_uuid', 'char_type']), None, output_filebasename, "_raw_all")
tmp=sys.argv[3].strip('"').strip() # strip outside double quotes
tmp_list=tmp_file_name_out.split(',') # assume comma separated
out_list = []
for f in tmp_list:
file_name_out_list.append(f.strip('"').strip()) # strip off inner set of double quotes
from configparsert import ConfigParser
config = ConfigParser()
config.read('setup.cfg')
print(config.sections())
for sct in config.sections():
for k,i in config[sct].items():
print(sct,": ",k,"=",i)
def config_file_to_dct(config_file):
config = ConfigParser()
config.read(config_file)
#print(config.sections())
config_dct = {}
for sct in config.sections():
config_dct[sct] = {k: i for k, i in config[sct].items()}
return config_dct
def print_config(config_dct):
for k,i in config_dct.items():
for kk,ii in i.items():
print(f"section '{k}': '{kk}' = '{ii}'")
config_dct = config_file_to_dct(config_file)
print_config(config_dct)
a_setting = config_file_to_dct["section"]["element"]
def write_to_file(df, sql, basename, sfx):
# Print the contents of the data frame
df.to_csv(basename+sfx+".csv", index = True, header=True, sep=",")
print("wrote to ",basename+sfx+".csv")
# If the SQL query was given, print it in a separate file with a similar name
if sql:
f=open(basename+sfx+".sql", "w", encoding="utf-8")
f.write(sql)
f.close()
perm_list=[]
for r in user_roles:
one_role={}
one_role["usr_id"]=user_id
one_role["grp_id"]=r.get("own_id")
perm_list.append(one_role)
import json
json.dumps(something_complex, indent=4) # serializes the object
json.dump(x, f) # serializes and writes to file f
x = json.load(f) # reads it back
import logging
import configparser
config_file_aws = "/. . ./.aws/credentials"
config_file_snf = "/. . ./.snfl.txt"
config_aws = configparser.ConfigParser()
config_aws.read(config_file_aws)
aws_access_key=config_aws.get("default","aws_access_key_id")
aws_secret_key=config_aws.get("default","aws_secret_access_key")
config_snf = configparser.ConfigParser()
config_snf.read(config_file_snf)
########## contents of config file
# [main]
# u=my_user_name
# p=my_super_secret_pw
##########
sf_account = config_snf.get("acct","a")
sf_user = config_snf.get("main","u")
sf_pw = config_snf.get("main","p")
logging.basicConfig( filename=__name__ + ".log"
, level=logging.INFO
#, level=logging.DEBUG
, format="%(asctime)s:%(levelname)s:%(name)s:%(filename)s:%(module)s:%(funcName)s:%(lineno)s:%(msg)s"
)
from snowflake.snowpark import Session
from snowflake.snowpark.functions import when
def exec_qry(s, sql):
logging.info(f"About to execute:\n====\n{sql}\n=====")
print(f"About to execute:\n====\n{sql}\n=====")
df = s.sql(sql)
logging.debug("Completed execution of query")
return df
params = { "account": sf_account
, "user": sf_user
, "password": sf_pw
, "warehouse": "wsmall"
, "database": "first"
}
s = Session.builder.configs(params).create()
df = exec_qry(s, "select current_version() as v")
df.show()
qry = "SELECT * FROM exer.firstb"
df = exec_qry(s, qry)
df.show(3)
# show
print("distinct countries: ",df.select(["CNTRY_NM", "CNTRY_CD"]).distinct().count(), sep="")
# Country or region code
df = df.withColumn("CRCD", when(df["CNTRY_CD"].isNull(), df["CNTRY_NM"]).otherwise(df["CNTRY_CD"]))
df_countries = df.select(["CNTRY_NM", "CNTRY_CD", "CRCD"]).distinct().na.fill("MULTIPLE", subset=["CNTRY_CD"])
df_crcd = df.select(["CRCD", "YEAR"]).distinct()
df_crcd.show(30)
# avg funding received
df_avg_funding_rcvd = df.filter(df["METRIC"] == "Funding received").groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_FUNDING")
print("avg funding received")
df_avg_funding_rcvd.show()
# avg people targeted
df_avg_tgt_peo = df.filter((df["METRIC"] == "People in need") | (df["METRIC"] == "People targeted")).groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_PEOPLE")
print("avg people in need or targeted")
df_avg_tgt_peo.show()
# all Mali
df.filter(df["CNTRY_NM"] == "Mali").show()
# count values (profiling)
if False:
#for c in df.columns:
dfc = df.groupBy(c).count()
# print(c, ": ", df.select(c).distinct().count(), sep="") this is the same as next
print(c, ": ", dfc.count(), sep="")
dfc.orderBy(dfc["COUNT"].desc()).show(5)
dfall = df_crcd.join(df_avg_funding_rcvd, on=["CRCD", "YEAR"], how="left").join(df_avg_tgt_peo, on=["CRCD", "YEAR"], how="left")
# remove rows where either people or funding is null
dfall = dfall.na.drop()
dfall = dfall.filter(dfall["AVG_FUNDING"] != 0).filter(dfall["AVG_PEOPLE"] != 0)
dfall = dfall.withColumn("AVG_FD_PER_PERS", dfall["AVG_FUNDING"]/dfall["AVG_PEOPLE"])
# read back the table
dfall.write.mode("overwrite").save_as_table("exer.dfall")
df_avg_funding_rcvd.write.mode("overwrite").save_as_table("exer.average_funding_received")
df_avg_tgt_peo.write.mode("overwrite").save_as_table("exer.average_tgt_peo")
qry = "SELECT * FROM exer.average_tgt_peo"
df = exec_qry(s, qry)
df.show()
logging.info(". . .done")
For an example on how to get the access key id and the secret key see the configparser in python_notes.html
import boto3
from botocore.exceptions import ClientError
def list_s3():
try:
s3_resource = boto3.resource(
"s3"
, aws_access_key_id = aws_access_key
, aws_secret_access_key = aws_secret_key
)
except ClientError as e:
print("Security token not set correctly (not authenticated correctly)")
raise e
print("=========== List of buckets:")
for bucket in s3_resource.buckets.all():
print(bucket.name)
print("=========== End of bucket list")
if __name__ == "__main__":
list_s3()
import boto3
from botocore.exceptions import ClientError
def get_secret():
session = boto3.session.Session()
client = session.client( service_name = "secretsmanager"
, region_name = "us-east-1"
, aws_access_key_id = aws_access_key
, aws_secret_access_key = aws_secret_key
)
try:
secret_value_response = client.get_secret_value(SecretId = "thefirst")
except ClientError as e:
print("Security token not set correctly (not authenticated correctly)")
raise e
# secret_value_response is a dictionary
#print("the response", str(secret_value_response))
print("============= The values retrieved")
for k, v in secret_value_response.items():
print(f"{k}: {v}")
print("============= end of values retrieved")
the_secret = secret_value_response["SecretString"] # the value for "SecretString" looks like a dictionary
# but it is a string in format {"k":"v"}
print("the name is ",secret_value_response["Name"]) # the name, same as the SecretId above in the "get_secret_value" call
return the_secret
if __name__ == "__main__":
print("found: ",get_secret())
<config_prefix>/my-config-section/my-setting:
my-config-section:
my-setting: my value
import os
import boto3
ssm = boto3.client('ssm')
parameter = ssm.get_parameter(Name=" name here ", WithDecryption=True)
print(parameter['Parameter']['Value'])
https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html
Count, and show "fizz" every 5 and "buzz" every 3
for num in range(20):
if (not num % 5) and (not num % 3):
print("FizzBuzz")
elif not num % 5:
print("Fizz")
if not num % 3:
print("Buzz")
else:
print(str(num))
Fibonacci sequence
# Fibonacci
a, b = 0, 1
for i in range(10):
print(i, ":", a)
a, b = b, a + b
Fibonacci with generator
def fib(num):
a, b = 0, 1
for i in range(num):
yield (i, a) # notice "yield" but no "return"
a, b = b, a + b
[f for f in fib(10)]