Python Snippets

  

These are my personal notes that I use as a quick help in my work.
You are welcome to read them.

Contents of current page Top-level home page
 
Index  Java Internet Oracle Notes
Linux Basics Web Basics SQL Notes
Informatica Servlets Apache BkpRstore SQL*Plus
Teradata   LDAP Storage PL/SQL
Windows     Tables OEM
UML   Net8 Portal
SQL Server Python perl Performance OLAP
Vmware Visual Basic PHP/MySQL User Mgmt  
Git        
More technical pages here

Contents

 

 


Basic

#!/usr/bin/python
import sys
print('hello World!')
print(sys.version)

 

 


Read Data from Three Instances

import pyodbc
import sys
import os
import ct_write_html
import ct_cn

input_param1 = sys.argv[1]
input_param2 = sys.argv[2]


fo = open(fo_name, "w") 


def display_results_3(fo, cr_sqlsA, cr_sqlsB, cr_sqlsC, the_env, pw, sql_sqlS, sql_ora, param_list_sqlS, param_list_ora, title, num_cols_in_key, num_cols_to_compare, num_cols_to_add):
    
    res_sqlSA = ct_cn.run_query(cr_sqlsA, sql_sqlS, param_list_sqlS)
    res_sqlSB = ct_cn.run_query(cr_sqlsB, sql_sqlS, param_list_sqlS)
    if the_env=="PROD":
        res_sqlSC = ct_cn.run_query(cr_sqlsC, sql_sqlS, param_list_sqlS)
    else:
        res_sqlSC = ([],[]) 

    # connect, query, and close, to prevent ORA timeout
    cnora = ct_cn.ora(the_env, pw)
    cr_ora = cnora.cursor()
    res_ora = ct_cn.run_query(cr_ora, sql_ora, param_list_ora)

    ct_write_html.display_results_of_two_sets_3instances(fo, res_sqlSA, res_sqlSB, res_sqlSC, res_ora, "eCC", "KC", "row count")

    res3 = ct_cn.put_three_in_one(res_sqlSA, res_sqlSB, res_sqlSC, num_cols_in_key, num_cols_to_add)
    
    ct_write_html.display_differences(fo, res3, res_ora, "eCC", "KC", num_cols_in_key, num_cols_to_compare, False)

    cnora.close()


empty_list = []


cnsqlsA.close()
cnsqlsB.close()
fo.close()

os.startfile(fo_name)
print ("Completed")

 

 


Connections for Oracle

import pyodbc



def ora(env, pw):
    print ("Creating connection for env=" + env + " in ct_cn.ora")
    #print ("pw="+pw)
    if env=='DV2':
        return pyodbc.connect("DSN=DATWDV2;PWD=" + pw)
    elif env=='QA':
        return pyodbc.connect("DSN=DATWQA;PWD=" + pw)
    elif env=='PS':
        return pyodbc.connect("DSN=DATWPS;PWD=" + pw)
    elif env=='PROD':
        return pyodbc.connect("DSN=DATW;PWD=" + pw)
    else:
        print ("env=" + env + " not recognized in ct_cn.ora")



def run_query(cr, sql_cmd, sql_param):
    # The results have two elements: the description of the columns and the list of rows

    cr.execute(sql_cmd, sql_param)

    # get the column descriptions
    col_desc = [d[0] for d in cr.description]
    #print ("Columns: ", col_desc)
    
    # Get the results
    results = []
    for one_row in cr:
        results.append(one_row)

    # put the column desciptions and the results together
    result = (col_desc, results)

    print (results)

    # return
    return result
    

def print_results(col_2dimres):
    # this assumes a tuple with col desc in 1st element and results in 2nd
    # see above
    print("columns: ", col_2dimres[0])
    for i, rw in enumerate(col_2dimres[1]):
        print("row", i+1, ": ", rw)


def make_result_dict(r, num_key_cols):
    # The output is dictionary
    # the keys are tuples created by the values in the columns 0 .. num_key_cols-1
    # the values are the rows from the query
    # The input assumes a tuple (col desc, results) as created above
    # So, this means that the rows in the second element of the tuple are values
    # in the resulting dictionary.
    # The column descriptions are not put into the dictionary

    the_dict = dict()
    #print ("r 1 " ,r)
    for rw in r[1]:
        #print ("rw=",rw)
        the_dict[rw[0:num_key_cols]] = rw
    return the_dict

 

 


Home grown functions to write to a file

import csv


def write_results(filename, col_2dimres, src, delim_char=',', quote_char='"'):
    # this assumes a tuple with col desc in 1st element and results in 2nd
    # see ct_cn
    with open(filename, 'w', newline='') as fo:
        lines_out = csv.writer(fo, delimiter=delim_char, quotechar=quote_char, quoting=csv.QUOTE_MINIMAL)
        fo.writerow(col_2dimres[0])
        for rw in col_2dimres[1]:
            lines_out.writerow(rw)
        close(fo)  # is this needed?

 

 


Plot with Output to File

import numpy as np
import matplotlib
backend = 'Agg'
matplotlib.use(backend)
import matplotlib.pyplot as plt

x = np.linspace(0, 1)
y = np.sin(4 * np.pi * x) * np.exp(-5 * x)

plt.fill(x, y, 'r')
plt.grid(True)
#plt.show()
plt.savefig('glu')

 

 


Plot with Output to Screen

import numpy as np
import matplotlib.pyplot as plt

x=np.arange(0.0, 2.1, 0.01)  # upper end of range: open

plt.plot(x, np.sin(np.pi * x))
plt.plot(x, np.cos(np.pi * x))
plt.plot(x, -1* np.sin(np.pi * x))
plt.plot(x, -1* np.cos(np.pi * x))
plt.axis([0,2,-1.1,1.1])
plt.show()



plot.axhline(305, linewidth=3, color='r')

import datetime as dt
import matplotlib.dates as mdates
dates = mdates.drange(dt.datetime(2010, 01, 01), dt.datetime(2012,11,01), dt.timedelta(weeks=3))


plt.xlabel('...', fontsize=14, color='red')
plt.ylabel('...')
plt.title('...')
plt.text(x, y, r'$text$')    #r indicates raw text
plt.axis([40, 160, 0, 0.03])
plt.grid(True)


plt.annotate('...', xy=(2, 1), xytext=(3, 1.5),
            arrowprops=dict(facecolor='black', shrink=0.05),
            )

 

 


Read Unicode Data

def readln(f):
    try:
        li=f.readline()
    except UnicodeDecodeError:
        li=str("")
    return li

 

 


Read File

# read file and count the row content
# creates a dictionary of all line values and counts the frequency

import os
                 
fo2=open("list_of_error_counts.txt",'w')

f="C:\\Users\\Z0242594\\Documents\\ct_files\\sql\\datw\\proc_adm_20151216_load_all_errors.txt"
print "Reading:", f, type(f)

ff = open(f,'r')
li = ff.readline()

distinct_line_count={}
while len(li)>0:
    line_count+=1
    if repr(li) in distinct_line_count:
        distinct_line_count[repr(li)]+=1
    else:
        distinct_line_count[repr(li)]=1
    
    li = ff.readline()
    
ff.close()

for d in distinct_line_count:
    fo2.write(d + ": " + str(distinct_line_count[d]) + '\n')

fo2.close()

 

 


Read File

# Reads a file and looks at the lines
# counts rows and ignores empty lines and lines with "1 row created."
# outputs all other rows to a file

import os


max_col_len=[]
for i in range(18):
    max_col_len.append(0)
print max_col_len


                 
fo=open("list_of_errors.txt",'w')


f="C:\\Users\\Z0242594\\Documents\\ct_files\\sql\\datw\\proc_adm_20151216_load_all_errors.txt"
print "Reading:", f, type(f)

ff = open(f,'r')
li = ff.readline()
line_count=0
line_ct_ok=0
line_ct_err=0

while len(li)>0:
    if li=="\n":
        pass
    elif li[0:14] == "1 row created.":
        line_ct_ok+=1
    else:
        fo.write(li)
        line_ct_err+=1

    li = ff.readline()
    
ff.close()
print f," has ",line_count, " lines"
print f," has ",line_ct_ok, " rows loaded"
print f," has ",line_ct_err, " errors"


fo.close()

 

 


Read File

# Loops through all files in a folder and treats "*.txt" and *.sql"

import os

for filename in os.listdir(src_dir):
    if filename.endswith((".txt", ".sql")):
        full_file_name = src_dir + "\\" + filename

 

 


create insert statements from file

import os

fo=open("proc_adm_20151216_insert_stmts.sql",'w')

fo.write('\n')
fo.write('set define off\n')                  # turn off define
fo.write('\n')

for f in os.listdir(os.getcwd()):
    if f[-4:len(f)]==".txt":
        print "Reading:", f, type(f), " sep=",date_sep

        skip_first=True
        ff = open(f,'r')
        li = ff.readline()
        line_count=0
        while len(li)>0:
            line_count+=1
            if skip_first:
                skip_first=False
            else:
                liel = li.split('|')
                # now make string
                stmt = "insert into diagnosis_corr_20151216 (MRN, Clinic, OldStatus, HistoryRecor)"
                stmt+= " values ("
                stmt+= "to_date('" + liel[7] + "', 'MM/DD/YYYY HH:MI:SS AM'), "     # date
                stmt+= "'" + liel[9].replace("'", "''") + "', "           # handle single quotes
                stmt+= "'" + f + "' "                                     # add the file name too
                stmt+= ");"
                fo.write(stmt + '\n')

            li = ff.readline()
        ff.close()

        fo.write('\n')
        fo.write('commit;\n')
        fo.write('\n')
        
fo.close()

 

 


SQLite

import sqlite3
db_name = "test_sql2cvs.db"
conn = sqlite3.connect(db_name)
cr = conn.cursor()
p = (3,6,)
cr.execute('select * from a where i in (?,?) ', p);
print([col_desc[0] for col_desc in cr.description]) # column descriptions
for one_row in cr:
    print(one_row)
cr.execute(sql_cmd)
cn.commit()
cn.close()

 

 


SQLite to CSV

import sqlite3
import csv
import time
#https://docs.python.org/3.4/library/csv.html

db_name = "test_sql2cvs.db"
conn = sqlite3.connect(db_name)
cr = conn.cursor()

fn_out = "test_sql2cvs.csv"

print("reading database " + db_name + " and inserting into " + fn_out)

cr.execute('select * from a');

print (cr.description)
print ([col_desc[0] for col_desc in cr.description])  # get the first in each description

with open(fn_out, 'w', newline='') as fo:
    lines_out = csv.writer(fo, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    lines_out.writerow([col_desc[0] for col_desc in cr.description])
    for one_row in cr:
        lines_out.writerow(one_row)
        print(one_row)

cr.execute('select * from a where i in (?,?) ', p);
for one_row in cr:
    print(one_row)




# # Never do this -- insecure!
# symbol = 'glu'
# c.execute("select * from stocks where symbol = '%s'" % symbol)
  
# Do this instead
t = ('glu',)
c.execute('select * from stocks where symbol=?', t)

# Larger example
for t in [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
          ('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
          ('2006-04-06', 'SELL', 'IBM', 500, 53.00),
         ]:
    c.execute('insert into stocks values (?,?,?,?,?)', t)
    

 

 


Start of File

import json, sqlite3, sys
import pandas as pd
sys.path.append("..\\ingit\\DG-py-collibra-rest-api\\core")
import utilities
import clb_functions



if len(sys.argv)<3:
    print("Two arguments are required: the file of posts and the file of votes.  Include the path")
    sys.exit(1)
else:
    posts_file_in=sys.argv[1]
    votes_file_in=sys.argv[2]

 

 


Open Database

if db_conn:
    ....
    #close database connection at end
    db_conn.close()
else:
    print("Could not open the database '" + db_name + "'")
print("...done")

 

 


Safe File Name

SAFE_CHAR_FOR_FILENAME="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-_"
def safe_string(s):
    if s:
        return ''.join([c for c in s if c in SAFE_CHAR_FOR_FILENAME])
    else:
        return ""
file_name=safe_string(...)

 

 


Split Dataframe

# split the dataframe into smaller pieces.  The result is a "group" of dataframes
for g in all_data.groupby(['env', 'domain_name', 'ast_type']):
    # Index is needed for the unstack
    r = g[1].set_index(['env', 'domain_id', 'domain_name', 'ast_type', 'ast_full_name', 'ast_display_name', 'ast_uuid', 'ast_descr', 'ast_rec_ty', 'char_type'])
    # g[0] has the values of the index, g[1] has the smaller dataframes
    write_to_file(r.unstack(level=-1), None, output_filebasename, "_" + safe_string("_".join(g[0])))

 

 


Add Timestamp to File Name

import time
curr_dttm = time.strftime("%Y%m%d") + "_" + time.strftime("%H%M%S")
output_filebasename += "_" + curr_dttm

 

 


Append Dataframes Vertically

the_sql = """select ast.env  """
just_assets = pd.read_sql_query(the_sql , db_conn, params=the_params)
write_to_file(just_assets, the_sql, output_filebasename, "_raw_ast")
print("Loaded list of all assets")

the_sql = """select ast.env ..."""
attributes = pd.read_sql_query(the_sql , db_conn, params=the_params)
write_to_file(attributes, the_sql, output_filebasename, "_raw_att")
print("Number of attributes:",str(len(attributes)))

the_sql = """select ast.env...              """
relations_to = pd.read_sql_query(the_sql , db_conn, params=the_params)
print("Number of relations to:",str(len(relations_to)))

# join all, vertically
all_data = just_assets.append(attributes, ignore_index=True, verify_integrity=False, sort=False).append(relations_to, ignore_index=True, verify_integrity=False, sort=False).append(relations_from, ignore_index=True, verify_integrity=False, sort=False)
#print(str(len(attributes))+'+'+str(len(relations_from))+'+'+str(len(relations_to))+'='+str(len(all_data)),str(len(attributes)+len(relations_from)+len(relations_to)==len(all_data)))
write_to_file(all_data.set_index(['env', 'domain_name', 'domain_id', 'ast_type', 'ast_full_name', 'ast_uuid', 'char_type']), None, output_filebasename, "_raw_all")

 

 


Format Input That Might Have Double Quotes

tmp=sys.argv[3].strip('"').strip()  # strip outside double quotes
tmp_list=tmp_file_name_out.split(',')   # assume comma separated
out_list = []
for f in tmp_list:
    file_name_out_list.append(f.strip('"').strip())   # strip off inner set of double quotes

 

 


Config File

from configparsert import ConfigParser

config = ConfigParser() 
config.read('setup.cfg') 
 
print(config.sections()) 
for sct in config.sections(): 
   for k,i in config[sct].items(): 
       print(sct,": ",k,"=",i) 

 


def config_file_to_dct(config_file):
    config = ConfigParser()
    config.read(config_file)
    #print(config.sections())
    config_dct = {}
    for sct in config.sections():
        config_dct[sct] = {k: i for k, i in config[sct].items()}
    return config_dct
 
def print_config(config_dct):
    for k,i in config_dct.items():
        for kk,ii in i.items():
            print(f"section '{k}': '{kk}' = '{ii}'")

config_dct = config_file_to_dct(config_file)
print_config(config_dct)
a_setting = config_file_to_dct["section"]["element"]

 

 


Write to File

def write_to_file(df, sql, basename, sfx):
    # Print the contents of the data frame
    df.to_csv(basename+sfx+".csv", index = True, header=True, sep=",")
    print("wrote to ",basename+sfx+".csv")
    # If the SQL query was given, print it in a separate file with a similar name
    if sql:
        f=open(basename+sfx+".sql", "w", encoding="utf-8")
        f.write(sql)
        f.close()

 

 


List

perm_list=[]
for r in user_roles:
    one_role={}
    one_role["usr_id"]=user_id
    one_role["grp_id"]=r.get("own_id")
    perm_list.append(one_role)

 

 


Json

import json
json.dumps(something_complex, indent=4) # serializes the object
json.dump(x, f) # serializes and writes to file f
x = json.load(f) # reads it back

 

 


Logging and Configuration

import logging
import configparser

config_file_aws = "/. . ./.aws/credentials"
config_file_snf = "/. . ./.snfl.txt"

config_aws = configparser.ConfigParser()
config_aws.read(config_file_aws)
aws_access_key=config_aws.get("default","aws_access_key_id")
aws_secret_key=config_aws.get("default","aws_secret_access_key")

config_snf = configparser.ConfigParser()
config_snf.read(config_file_snf)

      ########## contents of config file
      # [main]
      # u=my_user_name
      # p=my_super_secret_pw
      ##########

sf_account = config_snf.get("acct","a")
sf_user = config_snf.get("main","u")
sf_pw   = config_snf.get("main","p")

logging.basicConfig( filename=__name__ + ".log"
                   , level=logging.INFO
                   #, level=logging.DEBUG
                   , format="%(asctime)s:%(levelname)s:%(name)s:%(filename)s:%(module)s:%(funcName)s:%(lineno)s:%(msg)s"
                   )

 

 


Snowpark (1)

from snowflake.snowpark import Session
from snowflake.snowpark.functions import when

def exec_qry(s, sql):
    logging.info(f"About to execute:\n====\n{sql}\n=====")
    print(f"About to execute:\n====\n{sql}\n=====")
    df = s.sql(sql)
    logging.debug("Completed execution of query")
    return df

params = { "account":   sf_account
         , "user":      sf_user
         , "password":  sf_pw
         , "warehouse": "wsmall"
         , "database":  "first"
         }
s = Session.builder.configs(params).create()

df = exec_qry(s, "select current_version() as v")
df.show()

qry = "SELECT * FROM exer.firstb"
df = exec_qry(s, qry)
df.show(3)

# show
print("distinct countries: ",df.select(["CNTRY_NM", "CNTRY_CD"]).distinct().count(), sep="")


# Country or region code
df = df.withColumn("CRCD", when(df["CNTRY_CD"].isNull(), df["CNTRY_NM"]).otherwise(df["CNTRY_CD"]))

df_countries = df.select(["CNTRY_NM", "CNTRY_CD", "CRCD"]).distinct().na.fill("MULTIPLE", subset=["CNTRY_CD"])

df_crcd = df.select(["CRCD", "YEAR"]).distinct()
df_crcd.show(30)

# avg funding received
df_avg_funding_rcvd = df.filter(df["METRIC"] == "Funding received").groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_FUNDING")
print("avg funding received")
df_avg_funding_rcvd.show()

# avg people targeted
df_avg_tgt_peo = df.filter((df["METRIC"] == "People in need") | (df["METRIC"] == "People targeted")).groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_PEOPLE")
print("avg people in need or targeted")
df_avg_tgt_peo.show()

# all Mali
df.filter(df["CNTRY_NM"] == "Mali").show()


# count values (profiling)
if False:
#for c in df.columns:
    dfc = df.groupBy(c).count()
    # print(c, ": ", df.select(c).distinct().count(), sep="")   this is the same as next
    print(c, ": ", dfc.count(), sep="")
    dfc.orderBy(dfc["COUNT"].desc()).show(5)


dfall = df_crcd.join(df_avg_funding_rcvd, on=["CRCD", "YEAR"], how="left").join(df_avg_tgt_peo, on=["CRCD", "YEAR"], how="left")

# remove rows where either people or funding is null
dfall = dfall.na.drop()
dfall = dfall.filter(dfall["AVG_FUNDING"] != 0).filter(dfall["AVG_PEOPLE"] != 0)
dfall = dfall.withColumn("AVG_FD_PER_PERS", dfall["AVG_FUNDING"]/dfall["AVG_PEOPLE"])


# read back the table
dfall.write.mode("overwrite").save_as_table("exer.dfall")
df_avg_funding_rcvd.write.mode("overwrite").save_as_table("exer.average_funding_received")
df_avg_tgt_peo.write.mode("overwrite").save_as_table("exer.average_tgt_peo")
qry = "SELECT * FROM exer.average_tgt_peo"
df = exec_qry(s, qry)
df.show()

logging.info(". . .done")


 

 


boto3 (AWS) 1

For an example on how to get the access key id and the secret key see the configparser in python_notes.html

List S3 Buckets

import boto3
from botocore.exceptions import ClientError

def list_s3():
    try:
        s3_resource = boto3.resource( 
              "s3"
            , aws_access_key_id     = aws_access_key   
            , aws_secret_access_key = aws_secret_key
            )
    except ClientError as e:
        print("Security token not set correctly (not authenticated correctly)")
        raise e

    print("=========== List of buckets:")
    for bucket in s3_resource.buckets.all():
        print(bucket.name)
    print("=========== End of bucket list")

if __name__ == "__main__":
    list_s3()

Get a Secret from the Secrets Manager

import boto3
from botocore.exceptions import ClientError

def get_secret():

    session = boto3.session.Session()
    client = session.client( service_name          = "secretsmanager"
                           , region_name           = "us-east-1"
                           , aws_access_key_id     = aws_access_key
                           , aws_secret_access_key = aws_secret_key
                           )

    try:
        secret_value_response = client.get_secret_value(SecretId = "thefirst")
    except ClientError as e:
        print("Security token not set correctly (not authenticated correctly)")
        raise e

    # secret_value_response is a dictionary
    #print("the response", str(secret_value_response))
    print("============= The values retrieved")
    for k, v in secret_value_response.items():
        print(f"{k}: {v}")
    print("============= end of values retrieved")

    the_secret = secret_value_response["SecretString"]  # the value for "SecretString" looks like a dictionary
                                                        # but it is a string in format {"k":"v"}
    print("the name is ",secret_value_response["Name"]) # the name, same as the SecretId above in the "get_secret_value" call

    return the_secret

if __name__ == "__main__":
    print("found: ",get_secret())




Get a parameter from Parameter Store
<config_prefix>/my-config-section/my-setting:
my-config-section:
  my-setting: my value

import os
import boto3


ssm = boto3.client('ssm')
parameter = ssm.get_parameter(Name=" name here ", WithDecryption=True)

print(parameter['Parameter']['Value'])

https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html

 

 


Clever Tricks


Count, and show "fizz" every 5 and "buzz" every 3

for num in range(20):
    if (not num % 5) and (not num % 3):
        print("FizzBuzz")
    elif not num % 5:
        print("Fizz")
    if not num % 3:
        print("Buzz")
    else:
        print(str(num))

 

Fibonacci sequence

# Fibonacci
a, b = 0, 1
for i in range(10):
    print(i, ":", a)
    a, b = b, a + b

 

Fibonacci with generator

def fib(num):
    a, b = 0, 1
    for i in range(num):
        yield (i, a)   # notice "yield" but no "return"
        a, b = b, a + b

[f for f in fib(10)]