The collections module provides specialized data
structures.
Counter
c = Counter(iterable)c[item] += 1 to updatec.update(iterable) to update multiplec.most_common(2) returns 2 most common itemsc2 - c1OrderedDict [mostly irrelevant since dict
is now ordered]defaultdict
dd = defaultdict(list)dd["planets"].append("Mars")ChainMap
chain = ChainMap(dict1, dict2)deque
append, appendleft, extend,
extendleft, pop, popleftrotate(n), with n>0
for right-rotation, n<0 for
left-rotationnamedtuple
NamedTuple from typing moduleNote that defaultdict is all lower case, and not camel
case, for historical reasons.
import bisect
xs = ['a', 'b', 'c', 'p']
bisect.bisect_left(xs, x, lo=0, hi=len(xs), key=None)
bisect.bisect_left(xs, 'b') # 1
bisect.bisect_left(xs, 'd') # 3
bisect.bisect_left(xs, 'z') # 4
# bisect_left(xs, x): [ < x | x | x <= ]
# bisect_right(xs, x): [ <= x | x | x < ]
# insert into sorted list
bisect.insort(xs, 'd') # equivalent to insort_rightimport heapq
# uses a min-heap
q = []
heapq.heappush(q, value) # value = 'k', or = (3, 'k')
heapq.heapify(ls) # construct heap in linear time
item = heapq.heappop(q)
smallest = q[0] # effectively 'peek'
x = heapq.heappushpop(q, v) # faster than heappush() + heappop()from enum import Enum
class Color(Enum):
RED = 1
GREEN = 2
BLUE = 3
type(Color.red) # <enum 'Color'>
isinstance(Color.green, Color) # True
Color.red is Color.red # True
Color.red is Color(1) # True
>>> Color.red.name
'red'
>>> Color.red.value
1
len(Color) # 3
[c for c in Color]If the enums are required to be integers
from enum import IntEnum
# IntEnum is defined as:
class IntEnum(builtins.int, Enum):
...from datetime import datetime, date, time, timedelta, UTC
dt = datetime.now(tz=UTC)
date = dt.date()
date = date(2020,7,1)
date += timedelta(days=1) # days=0, seconds=0, minutes=0, hours=0, weeks=0
dt.year
dt.month
dt.day
dt.hour
dt.minute
dt.second
dt.tzinfoClasses in the datetime module
datedatetimetimetimedeltatimezonetzinfoThe date, datetime, time, and timezone types share these common features:
date_time = dt.datetime.strptime('Jun 1 2005 1:33PM', '%b %d %Y %I:%M%p')
datetime.strptime(sdt, "%Y%m%d%H%M")
datetime.now().replace(microsecond=0).isoformat()
from dateutil import parser
date_time = parser.parse("4th of July, 2015")import time
time.sleep(3) # secondsgraphlibgraph = {"C": {"A"}, "B": {"A"}}
sorter = TopologicalSorter(graph)
precedents = ["B", "C"]
sorter.add("D", *precedents)
sorter.static_order()
# ['A', 'C', 'B', 'D']
# raises CycleError if any cyclesThe numbers module defines ABCs such as:
comb(n,k), perm(n,k): combinations, permutationsgcd(n), lcm(n)ceil(x), floor(x), fabs(x)isclose(x, y, *, rel_tol=1e-09, abs_tol=0.0)isqrt(n)prod(iterable, start=1)log(x), log2(x), log10(x)exp(x), pow(x), sqrt(x)sin(x), cos(x), tan(x)atan2(y, x)erf(x)degrees(rad), radians(deg)dist(p, q), hypot(coords)pi, e, tau, inf, nanComplex math.
Exact precision decimals.
from decimal import Decimal as D
error = D('3.14') - D(3.14) # 3.14 doesn't have an exact IEEE-364 representation
ln10 = D('10').ln()from fractions import Fraction as F
F(1, 10) + F(1, 10) + F(1, 10) == F(3, 10)from random import randint, shuffle
randint(a,b) # from [a,b] incl
shuffle(list(range(9)))fmean() geometric_mean() harmonic_mean()
median() mode()
stdev() variance()
NormalDist(mu=0.0, sigma=1.0)
from_samples(data)
samples(m, seed=None)
pdf(x)
cdf(x)
inv_cdf(x)
zscore(x)
functoolsFor higher-order functions: functions that act on or return other functions.
@functools.cached_property(func)
functools.cmp_to_key(func)
@functools.lru_cache(user_function)
@functools.lru_cache(maxsize=128, typed=False)
func.cache_info() # to see hits and misses
@functools.total_ordering
functools.partial(func, /, *args, **keywords)
class functools.partialmethod(func, /, *args, **keywords)
functools.reduce(function, iterable[, initializer])
@functools.singledispatch
def fun(x):
print(x)
@fun.register(int)
@fun.register(float)
def _fun_nr(x):
print("nr", x)
fun.dispatch(int) # <function _fun_nr at ...>
fun.dispatch(str) # <function fun at ...>
fun.registry # mappingproxy
# Update a wrapper func to look like the wrapped func (preserve doc string etc)
@functools.wraps(wrapped, assigned=WRAPPER_ASSIGNMENTS, updated=WRAPPER_UPDATES)
def decorator(f):
@functools.wraps(f)
def decorated(*args, **kwargs): return f(*args, **kwargs)
return decoratedpartial has early-binding, while lambda has
late-binding
add = lambda x, y: x + y
n = 1
incr_lambda = lambda y: add(n, y)
incr_partial = partial(add, n)
incr_lambda(0), incr_partial(0) # 1, 1
n = 2
incr_lambda(0), incr_partial(0) # 2, 1itertoolsInfinite iterators:
count(10, step=1) --> 10 11 12 13 14 ...
cycle('ABCD') --> A B C D A B C D ...
repeat(10, 3) --> 10 10 10 ...Iterators terminating on the shortest input sequence:
accumulate([1,2,3,4,5], func=add, initial=10) --> 11 13 16 20 25
batched('ABCDEFG', n=3) --> ABC DEF G
chain('ABC', 'DEF') --> A B C D E F
chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1
filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8
groupby(sorted_data, keyfunc) --> (key, group) iterable
islice('ABCDEFG', start=2, stop=None, step=1) --> C D E F G
pairwise('ABCDEFG') --> AB BC CD DE EF FG
starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4
tee([1,2,3], n=2) --> ([1,2,3], [1,2,3])
zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-Combinatoric iterators:
product('ABC', repeat=2) --> AA AB AC BA BB BC CA CB CC
permutations('ABC', 2) --> AB AC BA BC CA CB
combinations('ABC', 2) --> AB AC BC
combinations_with_replacement('ABC',2) --> AA AB AC BB BC CCfrom typing import Any, cast
NT = TypeVar("NT", int, float)
# typing.Callable[[NT, NT], NT]
def sum(a: NT, b: NT) -> NT:
return a + b
xs = list[Any]
ys = cast(list[str], xs)Creates namedtuple classes. These are immutable.
from typing import NamedTuple
class Person(NamedTuple):
name: str
age: int
height: float
country: str = "ZA"
issubclass(Person, tuple) == True
p = Person("Joe", 25, 1.7)
p.namefrom abc import ABC, abstractmethod
class Base(ABC):
@abstractmethod
def foo(self):
pass
def bar(self): # concrete
return "y"
class SubClass(Base):
def foo(self):
return self.bar() + "x"Automatically adds generated special methods such as
__init__() and __repr__() to user-defined
classes.
from dataclasses import dataclass
@dataclass(init=True, repr=True, eq=True, order=False, unsafe_hash=False, frozen=False, slots=False)
@dataclass # same as above
class C:
name: str
price: float
quantity: int = 0 # with default value
def total_cost(self) -> float:
return self.price * self.quantityorder: __lt__(), __le__(),
__gt__(), and __ge__() methods will be
generated.unsafe_hash: If eq and frozen
are both true, a __hash__() method is generated. Otherwise,
can force generation of a hash method by setting this to
True.frozen: fields are read-only; assigning to fields will
generate an exception.from dataclasses import dataclass, field
@dataclass
class C:
mylist: list = field(default_factory=list)from contextlib import suppress
with suppress(FileNotFoundError):
os.remove('somefile.tmp')import string
string.ascii_letters
string.ascii_lowercase
string.ascii_uppercase
string.digits
string.hexdigits
string.octdigits
string.punctuation
string.printable
string.whitespaceimport re
regex = re.compile(pattern, re.IGNORECASE) # -> Pattern
# pos = start of string, endpos = end of string
search = regex.search(string, pos=0) # scans through the whole string
match = regex.match(string, pos=0) # scans only the beginning of the string
match = regex.fullmatch(string) # the whole string must match
s = "123abc"
m = re.match("[a-z]+", s) # None
m = re.search("[a-z]+", s) # "abc"
m = re.fullmatch("[0-9a-z]+", s) # -> Match
m = re.fullmatch("[0-9a-z]+", s + ".") # -> None
# m is a match object
m.group(0) # "abc"
m.group("name")
re.sub(pattern, replacement, string)
re.split(pattern, string, maxsplit=0)
re.split(r'\W+', 'a, bc, d.') # ['a', 'bc', 'd', '']
re.findall(r'\W+', 'a! bc; d.') # ['! ', '; ', '.']\w is [a-zA-Z0-9_] if ASCII flag
from string import Template
templ = Template("$x, $y")
s1 = templ.substitute(x=1, y=2)
s2 = templ.substitute({"x": 1, "y": 2})csvimport csv
# need to specify newline='', else
# - newlines embedded inside quoted fields will not be interpreted correctly
# - for "\r\n" line-endings, on write, an extra \r will be added
with open('f.csv', newline='') as f:
reader = csv.reader(f, delimiter=',', quotechar='"')
# OR reader = csv.DictReader(f, fieldnames=None)
for row in reader:
print(', '.join(row))
with open('f.csv', 'w', newline='') as f:
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(some)
writer.writerows(someiterable)
with open('f.csv', 'w', newline='') as csvfile:
fieldnames = ['a', 'b']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'a': 1, 'b': 2})
# csv.DictReader(f, fieldnames=None, restkey=None, restval=None, dialect='excel', *args, **kwds)jsonstring = '{ "name":"JK", "age":30}'
dictionary = json.loads(string)
string = json.dumps(obj, indent=4, sort_keys=True)
contents = json.load(file_obj)
json.dump(obj, file_obj)import gzip
content = b"Lots of content here"
c = gzip.compress(content)
m = gzip.decompress(c)
with gzip.open('f.txt.gz', 'wb') as f:
f.write(content)
with gzip.open('f.txt.gz', 'rb') as f:
content = f.read()
# compress an existing file
with open('f.txt', 'rb') as f_in:
with gzip.open('f.txt.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)from zipfile import ZipFile
# will create the file if not exists
with ZipFile("out.zip", 'w') as zip:
for f in files:
zip.write(f)
zip.writestr("name", str_or_bytes)os.pathos.getcwd()
os.listdir(path) --> files, directories
# path = ".." or "/" to go up a directory
# path = ".", for current directory
os.walk(path) --> dirpath, dirnames, filenames; recursive
os.path.isfile(path)
os.path.join(path1, path2, ...)
os.path.split(path)
os.path.absname(relpath)
os.makedirs(folder, exist_ok=True)pathlibhttps://docs.python.org/3/library/pathlib.html
import pathlib
path = Path(str=None) # None defaults to cwd
path.iterdir() # if path is dir, yields path objects of contents
path.glob(pattern)
path.rglob(pattern) # recursive glob, like path.glob(f"**/{pattern}")
path.joinpath(*path_fragments)
path / 'newdir' / 'newfile.txt' # joins
path.is_dir()
path.is_file()
path.exists()
path.mkdir(parents=False, exist_ok=False)
path.unlink(missing_ok=False) # removes file
path.with_suffix(".txt") # (a.txt).with_suffix(".gz") -> a.gz
path.relative_to(ancestor)
# Path("dir/hi.py")
path.name # "hi.py"
path.suffix # ".py"
path.stem # "hi"
path.parent
path.parts # ['dir', 'hi.py']
path.read_text(encoding=None, errors=None)
path.write_text(data, encoding=None, errors=None, newline=None)shutilimport shutil as sh
sh.copytree(s, d, dirs_exist_ok=True)
sh.copyfile(s, d)
sh.move(s, d)
sh.rmtree(o)
sh.copyfileobj(file_obj_src, file_obj_dst)tempfileimport tempfile
with tempfile.TemporaryFile(mode='w+b') as temp:
# use mode='w+t' for text, else uses binary mode
f.writelines([])
with tempfile.NamedTemporaryFile() as temp:
print(temp.name)
# held in memory, and written to file after max_size
spooled = tempfile.SpooledTemporaryFile(max_size=1000,
mode='w+t',
encoding='utf-8')
# is named
tempfile.TemporaryDirectory()from urllib.request import Request, urlopen
a = "www.example.com"
req = Request(a)
req.add_header('Accept-Encoding', 'gzip')
with urlopen(req) as response:
charset = response.info().get_content_charset()
content = response.read().decode(charset)Usually there is one stack per thread. In asyncio, each thread has an object called an Event Loop. The event loop contains within it a list of objects called Tasks. Each Task maintains a single stack, and its own execution pointer as well.
At any one time the event loop can only have one Task actually executing, whilst the other tasks in the loop are all paused. The currently executing task will continue to execute exactly as if it were executing a function in a normal (synchronous) program, right up until it gets to a point where it would have to wait for something to happen before it can continue.
Then, instead of waiting, the code in the Task yields control. This means that it asks the event loop to pause the Task it is running in, and wake it up again at a future point once the thing it needs to wait for has happened.
The event loop can then select one of its other sleeping tasks to wake up and become the executing task instead. Or if none of them are able to awaken (because they’re all waiting for things to happen) then it can wait.
This way the CPU’s time can be shared between different tasks, all of which are executing code capable of yielding like this when they would otherwise wait.
https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-2
import asyncio
async def get_data_from_io():
...
async def process_data(data):
...
async def main():
while true:
data = await get_data_from_io()
await process_data(data)
asyncio.run(main())You probably won’t create your own futures very often.
Unlike a coroutine object when a future is awaited it does not cause a block of code to be executed. Instead a future object can be thought of as representing some process that is ongoing elsewhere and which may or may not yet be finished. When you await a future the following happens if the process has:
Task inherits from asyncio.Future.
Each event loop contains a number of tasks, and every coroutine that is executing is doing so inside a task.
async def coroutine_func():
...
task = asyncio.create_task(coroutine_func())
# same as
task = asyncio.get_event_loop().create_task(coroutine_func())The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio.Future. The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine’s code-block. The returned future will be marked as done() only when the task has finished execution. As you might expect the return value of the coroutine’s code block is the result() which will be stored in the future object when it is finished (and if it raises then the exception will be caught and stored in the future).
Probably avoid trying to create a task from synchronous code since event loop might not be running.
def blocking():
...
asyncio.to_thread(blocking)
# to_thread does something like:
await asyncio.get_running_loop().run_in_executor(blocking)import argparse
parser = argparse.ArgumentParser(description='summer')
parser.add_argument('ints', metavar='N', type=int, nargs='+',
help='integer')
parser.add_argument('--sum', dest='func', action='store_const',
const=sum, default=max,
help='sums (default: find the max)')
args = parser.parse_args()
args.intsimport subprocess
command = ["my.exe", arg, "--flag", "--option", choice]
result = subprocess.run(command, text=True, stdout=subprocess.PIPE, encoding="ascii")
return result.stdout
# doesn't wait for result
# shell=True suppresses the new command window
subprocess.Popen(command, shell=True)import sys
sys.argv # `python egg.py one two` -> ['egg.py', 'one', 'two']
pd.read_csv(sys.stdin)
sys.stdout.flush()
sys.exit(0)
sys.getsizeof(x) # size of x in bytesasyncio: cooperative multitasking (1 processor)threading: pre-emptive multitasking (1 processor)multiprocessing: processes run the same time on
different processersasyncio (and threading) are for usually
best for IO-bound problems. multiprocessing is best for
CPU-bound code.
import multiprocessing as mp
mp.cpu_count()
proc = mp.Process(target=func, args=...)
proc.start()
proc.join() # wait for process to finish
proc.is_alive() # should be falseShared memory:
# creating Array of int data type with space for 4 integers
array = mp.Array('i', 4) # 'd' for float
# creating Value of int data type
value = mp.Value('i')
proc = mp.Process(target=func2, args=(inputs, array, value))
# in the main process, these will also be modified
array[:]
value.valueServer process managers are more flexible than using shared memory, but slower than using shared memory.
with mp.Manager() as manager:
# creating a list in server process memory
records = manager.list([1, 3])
p = mp.Process(target=insert_record, args=(new_record, records))
p.start()
p.join()Queues and Pipes
# creating multiprocessing Queue
q = mp.Queue()
# creating new processes
p1 = mp.Process(target=square_list, args=(mylist, q))
p2 = mp.Process(target=print_queue, args=(q,))
# q.put(color)
# q.get() # pops
# creating a pipe
parent_conn, child_conn = mp.Pipe()
# creating new processes
p1 = mp.Process(target=sender, args=(parent_conn,msgs))
p2 = mp.Process(target=receiver, args=(child_conn,))
# conn.send()
# msg = conn.recv()import ctypes as ct
lib = ct.CDLL("lib.so") # "lib.dll" on Windows
# int add(int x, int y);
s = lib.add(3, 2)
# double square(double x);
lib.square.restype = ct.c_double
sd = lib.square(ct.c_double(3))
# time_t time(time_t *);
lib.time.argtypes = (ct.POINTER(ct.c_time_t),)
lib.time.restype = ct.c_time_t
# void set_object(ObjectPtr obj);
lib.set_object.argtypes = ct.c_void_p,
lib.set_object.restype = None
# int div(double x, double y, double * r);
def err_chk(res, func, args):
if res == 0:
return val
raise DivideByZeroException()
lib.div.argtypes = ct.c_double, ct.c_double, ct.POINTER(ct.c_double)
lib.div.restype = ct.c_int
lib.div.errcheck = err_chk
res = ct.c_double()
lib.div(6, 2, ct.byref(res))
res.value # 3
i = c_int(42)
i.value = -99Only these native Python objects can directly be used as parameters in C function calls:
None: C NULL pointerint type, their value
is masked to fit into the C typeFunctions are assumed to return the C int type.
Define a from_param class method, which must return an
integer/string/ctypes instance.
class Value:
def __init__(self, value: int):
self.value = value
@classmethod
def from_param(obj):
return self.value
lib.cfunc(Value(42))Make sure you keep references to objects as long as they are used
from C code. ctypes doesn’t, and if you don’t, they may be
garbage collected!
doctestdef factorial(n):
"""Return the factorial of n, an exact integer >= 0.
>>> [factorial(n) for n in range(6)]
[1, 1, 2, 6, 24, 120]
>>> factorial(-1)
Traceback (most recent call last):
...
ValueError: n must be >= 0
"""
pass
if __name__ == "__main__":
import doctest
doctest.testmod()python -m doctest -v example.py
import logging
# basicConfig can only be called once
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w')
# default outputs to std-out
logging.debug('message')
logging.info('message')
logging.warning('message')
logging.error('message')
logging.critical('message')pdbpython3 -m pdb myscript.py
If the program exits abnormally pdb will automatically
enter post-mortem debugging. After post-mortem debugging (or after
normal exit of the program), pdb will restart the
program.
Add breakpoint() to enter the interactive debugger.
Commands
h # help
q # quit
restart
whatis expression # returns type
p expression # evaluate and print
c # continue to next breakpoint
r # continue until function return
n # continue to next line
s # execute current line, stop at the first occasion
b(reak) [([filename:]lineno | function) [, condition]]
u(p) [count] # up the stack trace
d(own) [count] # down the stack trace
python -m cProfile myscript.py
import cProfile
cProfile.run('main()')import dis
def fun(x):
return x + 1
print(dis.dis(fun))