Python Standard Library

Data Types

Collections module

The collections module provides specialized data structures.

Note that defaultdict is all lower case, and not camel case, for historical reasons.

Bisect

import bisect

xs = ['a', 'b', 'c', 'p']
bisect.bisect_left(xs, x, lo=0, hi=len(xs), key=None)
bisect.bisect_left(xs, 'b')  # 1
bisect.bisect_left(xs, 'd')  # 3
bisect.bisect_left(xs, 'z')  # 4

# bisect_left(xs, x):  [ < x | x | x <= ]
# bisect_right(xs, x):  [ <= x | x | x < ]

# insert into sorted list
bisect.insort(xs, 'd')  # equivalent to insort_right

Heapq

import heapq
# uses a min-heap

q = []
heapq.heappush(q, value) # value = 'k', or = (3, 'k')
heapq.heapify(ls) # construct heap in linear time

item = heapq.heappop(q)
smallest = q[0] # effectively 'peek'

x = heapq.heappushpop(q, v) # faster than heappush() + heappop()

Enum

from enum import Enum
class Color(Enum):
    RED = 1
    GREEN = 2
    BLUE = 3

type(Color.red) # <enum 'Color'>
isinstance(Color.green, Color) # True
Color.red is Color.red # True
Color.red is Color(1) # True
>>> Color.red.name
'red'
>>> Color.red.value
1
len(Color) # 3
[c for c in Color]

If the enums are required to be integers

from enum import IntEnum

# IntEnum is defined as:
class IntEnum(builtins.int, Enum):
    ...

Dates and Time

from datetime import datetime, date, time, timedelta, UTC

dt = datetime.now(tz=UTC)
date = dt.date()
date = date(2020,7,1)
date += timedelta(days=1) # days=0, seconds=0, minutes=0, hours=0, weeks=0
dt.year
dt.month
dt.day
dt.hour
dt.minute
dt.second
dt.tzinfo

Classes in the datetime module

The date, datetime, time, and timezone types share these common features:

date_time = dt.datetime.strptime('Jun 1 2005  1:33PM', '%b %d %Y %I:%M%p')
datetime.strptime(sdt, "%Y%m%d%H%M")

datetime.now().replace(microsecond=0).isoformat()

from dateutil import parser
date_time = parser.parse("4th of July, 2015")

Docs: date string format.

Time

import time

time.sleep(3) # seconds

graphlib

graph = {"C": {"A"}, "B": {"A"}}
sorter = TopologicalSorter(graph)
precedents = ["B", "C"]
sorter.add("D", *precedents)

sorter.static_order()
# ['A', 'C', 'B', 'D']
# raises CycleError if any cycles

Numeric and Mathematical Modules

The numbers module defines ABCs such as:

math

cmath

Complex math.

decimal

Exact precision decimals.

from decimal import Decimal as D
error = D('3.14') - D(3.14) # 3.14 doesn't have an exact IEEE-364 representation
ln10 = D('10').ln()

fractions

from fractions import Fraction as F

F(1, 10) + F(1, 10) + F(1, 10) == F(3, 10)

random

from random import randint, shuffle

randint(a,b) # from [a,b] incl
shuffle(list(range(9)))

statistics

fmean() geometric_mean() harmonic_mean()
median() mode()
stdev() variance()
NormalDist(mu=0.0, sigma=1.0)
    from_samples(data)
    samples(m, seed=None)
    pdf(x)
    cdf(x)
    inv_cdf(x)
    zscore(x)

Functional Programming Modules

functools

For higher-order functions: functions that act on or return other functions.

@functools.cached_property(func)
functools.cmp_to_key(func)
@functools.lru_cache(user_function)
@functools.lru_cache(maxsize=128, typed=False)
func.cache_info() # to see hits and misses
@functools.total_ordering
functools.partial(func, /, *args, **keywords)
class functools.partialmethod(func, /, *args, **keywords)
functools.reduce(function, iterable[, initializer])

@functools.singledispatch
def fun(x):
    print(x)

@fun.register(int)
@fun.register(float)
def _fun_nr(x):
    print("nr", x)

fun.dispatch(int) # <function _fun_nr at ...>
fun.dispatch(str) # <function fun at ...>
fun.registry # mappingproxy

# Update a wrapper func to look like the wrapped func (preserve doc string etc)
@functools.wraps(wrapped, assigned=WRAPPER_ASSIGNMENTS, updated=WRAPPER_UPDATES)

def decorator(f):
    @functools.wraps(f)
    def decorated(*args, **kwargs): return f(*args, **kwargs)
    return decorated

partial has early-binding, while lambda has late-binding

add = lambda x, y: x + y
n = 1
incr_lambda = lambda y: add(n, y)
incr_partial = partial(add, n)
incr_lambda(0), incr_partial(0) # 1, 1
n = 2
incr_lambda(0), incr_partial(0) # 2, 1

itertools

Docs

Infinite iterators:

count(10, step=1) --> 10 11 12 13 14 ...
cycle('ABCD') --> A B C D A B C D ...
repeat(10, 3) --> 10 10 10 ...

Iterators terminating on the shortest input sequence:

accumulate([1,2,3,4,5], func=add, initial=10) --> 11 13 16 20 25
batched('ABCDEFG', n=3) --> ABC DEF G
chain('ABC', 'DEF') --> A B C D E F
chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1
filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8
groupby(sorted_data, keyfunc) --> (key, group) iterable
islice('ABCDEFG', start=2, stop=None, step=1) --> C D E F G
pairwise('ABCDEFG') --> AB BC CD DE EF FG
starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4
tee([1,2,3], n=2) --> ([1,2,3], [1,2,3])
zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-

Combinatoric iterators:

product('ABC', repeat=2) --> AA AB AC BA BB BC CA CB CC
permutations('ABC', 2) --> AB AC BA BC CA CB
combinations('ABC', 2) --> AB AC BC
combinations_with_replacement('ABC',2) --> AA AB AC BB BC CC

Typing

from typing import Any, cast

NT = TypeVar("NT", int, float)

# typing.Callable[[NT, NT], NT]
def sum(a: NT, b: NT) -> NT:
    return a + b


xs = list[Any]
ys = cast(list[str], xs)

NamedTuple

Creates namedtuple classes. These are immutable.

from typing import NamedTuple

class Person(NamedTuple):
    name: str
    age: int
    height: float
    country: str = "ZA"

issubclass(Person, tuple) == True
p = Person("Joe", 25, 1.7)
p.name

Runtime Services

Abstract Base Classes

from abc import ABC, abstractmethod

class Base(ABC):
    @abstractmethod
    def foo(self):
        pass

    def bar(self): # concrete
        return "y"

class SubClass(Base):
    def foo(self):
        return self.bar() + "x"

Data Classes

Automatically adds generated special methods such as __init__() and __repr__() to user-defined classes.

from dataclasses import dataclass

@dataclass(init=True, repr=True, eq=True, order=False, unsafe_hash=False, frozen=False, slots=False)
@dataclass # same as above
class C:
    name: str
    price: float
    quantity: int = 0 # with default value

    def total_cost(self) -> float:
        return self.price * self.quantity
from dataclasses import dataclass, field

@dataclass
class C:
    mylist: list = field(default_factory=list)

contextlib

from contextlib import suppress

with suppress(FileNotFoundError):
    os.remove('somefile.tmp')

Text Processing

String

Built-in Constants

import string
string.ascii_letters
string.ascii_lowercase
string.ascii_uppercase
string.digits
string.hexdigits
string.octdigits
string.punctuation
string.printable
string.whitespace

Regex

import re

regex = re.compile(pattern, re.IGNORECASE) # -> Pattern
# pos = start of string, endpos = end of string
search = regex.search(string, pos=0) # scans through the whole string
match = regex.match(string, pos=0) # scans only the beginning of the string
match = regex.fullmatch(string) # the whole string must match


s = "123abc"
m = re.match("[a-z]+", s) # None
m = re.search("[a-z]+", s) # "abc"
m = re.fullmatch("[0-9a-z]+", s) # -> Match
m = re.fullmatch("[0-9a-z]+", s + ".") # -> None

# m is a match object
m.group(0) # "abc"
m.group("name")

re.sub(pattern, replacement, string)
re.split(pattern, string, maxsplit=0)
re.split(r'\W+', 'a, bc, d.') # ['a', 'bc', 'd', '']
re.findall(r'\W+', 'a! bc; d.') # ['! ', '; ', '.']

\w is [a-zA-Z0-9_] if ASCII flag

Using the scanner

Templates

from string import Template

templ = Template("$x, $y")
s1 = templ.substitute(x=1, y=2)
s2 = templ.substitute({"x": 1, "y": 2})

Files and Data

csv

import csv

# need to specify newline='', else
# - newlines embedded inside quoted fields will not be interpreted correctly
# - for "\r\n" line-endings, on write, an extra \r will be added
with open('f.csv', newline='') as f:
    reader = csv.reader(f, delimiter=',', quotechar='"')
    # OR reader = csv.DictReader(f, fieldnames=None)
    for row in reader:
        print(', '.join(row))

with open('f.csv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    writer.writerow(some)
    writer.writerows(someiterable)

with open('f.csv', 'w', newline='') as csvfile:
    fieldnames = ['a', 'b']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    writer.writerow({'a': 1, 'b': 2})

# csv.DictReader(f, fieldnames=None, restkey=None, restval=None, dialect='excel', *args, **kwds)

json

string =  '{ "name":"JK", "age":30}'
dictionary = json.loads(string)
string = json.dumps(obj, indent=4, sort_keys=True)

contents = json.load(file_obj)
json.dump(obj, file_obj)

Data Compression and Archiving

gzip

import gzip

content = b"Lots of content here"

c = gzip.compress(content)
m = gzip.decompress(c)

with gzip.open('f.txt.gz', 'wb') as f:
    f.write(content)

with gzip.open('f.txt.gz', 'rb') as f:
    content = f.read()

# compress an existing file
with open('f.txt', 'rb') as f_in:
    with gzip.open('f.txt.gz', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

zipfile

from zipfile import ZipFile

# will create the file if not exists
with ZipFile("out.zip", 'w') as zip:
    for f in files:
        zip.write(f)
    zip.writestr("name", str_or_bytes)

File and Directory Access

os.path

os.getcwd()
os.listdir(path) --> files, directories
    # path = ".." or "/" to go up a directory
    # path = ".", for current directory
os.walk(path) --> dirpath, dirnames, filenames; recursive
os.path.isfile(path)
os.path.join(path1, path2, ...)
os.path.split(path)
os.path.absname(relpath)
os.makedirs(folder, exist_ok=True)

pathlib

https://docs.python.org/3/library/pathlib.html

import pathlib
path = Path(str=None) # None defaults to cwd
path.iterdir() # if path is dir, yields path objects of contents
path.glob(pattern)
path.rglob(pattern) # recursive glob, like path.glob(f"**/{pattern}")
path.joinpath(*path_fragments)
path / 'newdir' / 'newfile.txt' # joins
path.is_dir()
path.is_file()
path.exists()
path.mkdir(parents=False, exist_ok=False)
path.unlink(missing_ok=False) # removes file
path.with_suffix(".txt") # (a.txt).with_suffix(".gz") -> a.gz
path.relative_to(ancestor)

# Path("dir/hi.py")
path.name # "hi.py"
path.suffix # ".py"
path.stem # "hi"
path.parent
path.parts # ['dir', 'hi.py']

path.read_text(encoding=None, errors=None)
path.write_text(data, encoding=None, errors=None, newline=None)

shutil

import shutil as sh

sh.copytree(s, d, dirs_exist_ok=True)
sh.copyfile(s, d)
sh.move(s, d)
sh.rmtree(o)
sh.copyfileobj(file_obj_src, file_obj_dst)

tempfile

import tempfile

with tempfile.TemporaryFile(mode='w+b') as temp:
    # use mode='w+t' for text, else uses binary mode
    f.writelines([])

with tempfile.NamedTemporaryFile() as temp:
    print(temp.name)

# held in memory, and written to file after max_size
spooled = tempfile.SpooledTemporaryFile(max_size=1000,
                                   mode='w+t',
                                   encoding='utf-8')

# is named
tempfile.TemporaryDirectory()

Internet Protocols and Support

from urllib.request import Request, urlopen

a = "www.example.com"
req = Request(a)
req.add_header('Accept-Encoding', 'gzip')

with urlopen(req) as response:
    charset = response.info().get_content_charset()
    content = response.read().decode(charset)

Async

Usually there is one stack per thread. In asyncio, each thread has an object called an Event Loop. The event loop contains within it a list of objects called Tasks. Each Task maintains a single stack, and its own execution pointer as well.

At any one time the event loop can only have one Task actually executing, whilst the other tasks in the loop are all paused. The currently executing task will continue to execute exactly as if it were executing a function in a normal (synchronous) program, right up until it gets to a point where it would have to wait for something to happen before it can continue.

Then, instead of waiting, the code in the Task yields control. This means that it asks the event loop to pause the Task it is running in, and wake it up again at a future point once the thing it needs to wait for has happened.

The event loop can then select one of its other sleeping tasks to wake up and become the executing task instead. Or if none of them are able to awaken (because they’re all waiting for things to happen) then it can wait.

This way the CPU’s time can be shared between different tasks, all of which are executing code capable of yielding like this when they would otherwise wait.

https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-2

import asyncio

async def get_data_from_io():
    ...

async def process_data(data):
    ...

async def main():
    while true:
        data = await get_data_from_io()
        await process_data(data)

asyncio.run(main())

Future

You probably won’t create your own futures very often.

Unlike a coroutine object when a future is awaited it does not cause a block of code to be executed. Instead a future object can be thought of as representing some process that is ongoing elsewhere and which may or may not yet be finished. When you await a future the following happens if the process has:

Tasks

Task inherits from asyncio.Future.

Each event loop contains a number of tasks, and every coroutine that is executing is doing so inside a task.

async def coroutine_func():
    ...

task = asyncio.create_task(coroutine_func())
# same as
task = asyncio.get_event_loop().create_task(coroutine_func())

The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio.Future. The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine’s code-block. The returned future will be marked as done() only when the task has finished execution. As you might expect the return value of the coroutine’s code block is the result() which will be stored in the future object when it is finished (and if it raises then the exception will be caught and stored in the future).

Probably avoid trying to create a task from synchronous code since event loop might not be running.

Calling sync blocking code

Docs

def blocking():
    ...

asyncio.to_thread(blocking)
# to_thread does something like:
await asyncio.get_running_loop().run_in_executor(blocking)

Programs

argparse

import argparse

parser = argparse.ArgumentParser(description='summer')
parser.add_argument('ints', metavar='N', type=int, nargs='+',
                    help='integer')
parser.add_argument('--sum', dest='func', action='store_const',
                    const=sum, default=max,
                    help='sums (default: find the max)')

args = parser.parse_args()
args.ints

subprocess

import subprocess

command = ["my.exe", arg, "--flag", "--option", choice]
result = subprocess.run(command, text=True, stdout=subprocess.PIPE, encoding="ascii")
return result.stdout

# doesn't wait for result
# shell=True suppresses the new command window
subprocess.Popen(command, shell=True)

sys

import sys

sys.argv # `python egg.py one two` -> ['egg.py', 'one', 'two']
pd.read_csv(sys.stdin)
sys.stdout.flush()
sys.exit(0)

sys.getsizeof(x) # size of x in bytes

Concurrency and Parallelism

asyncio (and threading) are for usually best for IO-bound problems. multiprocessing is best for CPU-bound code.

Multi-processing

import multiprocessing as mp

mp.cpu_count()
proc = mp.Process(target=func, args=...)
proc.start()
proc.join() # wait for process to finish
proc.is_alive() # should be false

Shared memory:

# creating Array of int data type with space for 4 integers
array = mp.Array('i', 4) # 'd' for float
# creating Value of int data type
value = mp.Value('i')

proc = mp.Process(target=func2, args=(inputs, array, value))
# in the main process, these will also be modified
array[:]
value.value

Server process managers are more flexible than using shared memory, but slower than using shared memory.

with mp.Manager() as manager:
    # creating a list in server process memory
    records = manager.list([1, 3])
    p = mp.Process(target=insert_record, args=(new_record, records))
    p.start()
    p.join()

Queues and Pipes

# creating multiprocessing Queue
q = mp.Queue()

# creating new processes
p1 = mp.Process(target=square_list, args=(mylist, q))
p2 = mp.Process(target=print_queue, args=(q,))

# q.put(color)
# q.get() # pops


# creating a pipe
parent_conn, child_conn = mp.Pipe()

# creating new processes
p1 = mp.Process(target=sender, args=(parent_conn,msgs))
p2 = mp.Process(target=receiver, args=(child_conn,))
# conn.send()
# msg = conn.recv()

C Interface

ctypes

import ctypes as ct

lib = ct.CDLL("lib.so")  # "lib.dll" on Windows

# int add(int x, int y);
s = lib.add(3, 2)

# double square(double x);
lib.square.restype = ct.c_double
sd = lib.square(ct.c_double(3))

# time_t time(time_t *);
lib.time.argtypes = (ct.POINTER(ct.c_time_t),)
lib.time.restype = ct.c_time_t

# void set_object(ObjectPtr obj);
lib.set_object.argtypes = ct.c_void_p,
lib.set_object.restype = None

# int div(double x, double y, double * r);
def err_chk(res, func, args):
    if res == 0:
        return val
    raise DivideByZeroException()

lib.div.argtypes = ct.c_double, ct.c_double, ct.POINTER(ct.c_double)
lib.div.restype = ct.c_int
lib.div.errcheck = err_chk

res = ct.c_double()
lib.div(6, 2, ct.byref(res))
res.value  # 3

i = c_int(42)
i.value = -99

Only these native Python objects can directly be used as parameters in C function calls:

Functions are assumed to return the C int type.

Custom Types

Define a from_param class method, which must return an integer/string/ctypes instance.

class Value:
    def __init__(self, value: int):
        self.value = value

    @classmethod
    def from_param(obj):
        return self.value

lib.cfunc(Value(42))

Make sure you keep references to objects as long as they are used from C code. ctypes doesn’t, and if you don’t, they may be garbage collected!

Testing

doctest

def factorial(n):
    """Return the factorial of n, an exact integer >= 0.

    >>> [factorial(n) for n in range(6)]
    [1, 1, 2, 6, 24, 120]
    >>> factorial(-1)
    Traceback (most recent call last):
        ...
    ValueError: n must be >= 0
    """
    pass

if __name__ == "__main__":
    import doctest
    doctest.testmod()

python -m doctest -v example.py

Debugging and Profiling

Logging

import logging

# basicConfig can only be called once
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w')
# default outputs to std-out

logging.debug('message')
logging.info('message')
logging.warning('message')
logging.error('message')
logging.critical('message')

Debugger pdb

python3 -m pdb myscript.py

If the program exits abnormally pdb will automatically enter post-mortem debugging. After post-mortem debugging (or after normal exit of the program), pdb will restart the program.

Add breakpoint() to enter the interactive debugger.

Commands

h # help
q # quit
restart
whatis expression # returns type
p expression # evaluate and print
c # continue to next breakpoint
r # continue until function return
n # continue to next line
s # execute current line, stop at the first occasion
b(reak) [([filename:]lineno | function) [, condition]]
u(p) [count] # up the stack trace
d(own) [count] # down the stack trace

Profiling

python -m cProfile myscript.py

import cProfile
cProfile.run('main()')

Disassembly

import dis

def fun(x):
    return x + 1

print(dis.dis(fun))