Full API

A suite of functions to promote functional programming in Python.

DFP, or Dogmatic Functional Procedures, is a library of functions to make functional programming in Python easier. In this way, DFP contains many functions to build useful abstractions through function compositions and transformations over data. Many of these functions are built with generic data types in mind, and not specifically related to astrophyical data.

class dfp.Sequence(data: Iterable)

Abstraction of an Iterable that supports map, filter, and reduce. The intention of this class is to provide an alternative description of applying functional program operations from the perspective of an object. For example, we can create an instantiation of Sequence passing some data-structure during the invocation of the class:

>>> from src.dfp import Sequence
>>> seq = Sequence(range(100))

This will create a new Sequence object, to which we may then apply operations, such as doubling the value of each element in the sequence:

>>> seq.map(lambda x: x * 2)

The return of this .map() call is another Sequence with each element being doubled.

This is the alternative of using the lmap function, passing both the function and the data-structure to map over:

>>> from src.dfp import lmap
>>> lmap(lambda x: x * 2, range(100))

One benefit of the Sequence class is that chaining operations, depending on your perspective, may look cleaner than the function based approach. For instance, let’s only double the even numbers, removing any odd elements before:

>>> (seq
>>>  .filter(lambda x: x % 2 == 0)
>>>  .map(lambda x: x * 2))

The function only approach to this same problem may be to use pipe or compose:

>>> from src.dfp import pipe, lfilter
>>> pipe(
>>>     range(100),
>>>     lambda seq: lfilter(lambda x: x % 2 == 0, seq),
>>>     lambda seq: lmap(lambda x: x * 2))

In this latter example, we must ‘wrap’ our lfilter and lmap to prevent Python from immediately calling these functions.

The design of DFP is flexible and based on the user’s preference. If you want to use Sequence, there is no comprise as the class’s methods resolve to the functions such as lmap/tmap etc.

Methods

filter:

Filter a sequence.

map:

Apply a function to each element.

reduce:

Apply the reduce operation to the sequence.

filter(f)

Filter a sequence of elements where each element is only included in the new sequence if f(x) == True.

Parameters:

f (Callable.) – The boolean function to apply to each element that determines if the element should be included in the resulting sequence.

Returns:

A new sequence with elements only included if f(x) == True

>>> from src.dfp import Sequence
>>> Sequence(range(10)).filter(lambda x: x % 2 == 0)
Sequence((0, 2, 4, 6, 8))
map(f, parallel: bool = False, p_workers: int = 4)

Apply function f to every element of the sequence resulting in a new sequence of elements

Parameters:
  • f (Callable) – The function to apply to each element.

  • parallel (Bool) – (default: False) Whether to run the map in parallel threads.

  • p_workers (int) – (default: 4) The number of paraellel threads.

Returns:

A new sequence with each element being f(x).

>>> from src.dfp import Sequence
>>> Sequence(range(5)).map(lambda x: x * 2)
Sequence((0, 2, 4, 6, 8))
reduce(f, init: Any = 0)

Apply a reduction operation to the sequence. If the result not a singleton, a new Sequence is returned, else the singleton is returned.

Parameters:
  • f (Callable) – The reduction function to apply to each element of the sequence.

  • init (Any) – (default: 0) The initial value.

Returns:

A new sequence if the result is not a singleton.

>>> from src.dfp import Sequence
>>> Sequence(range(10)).reduce(lambda t, s: t + s)
45
>>> Sequence(range(10)).reduce(lambda t, s: t+[s] if s%2==0 else t, [])
Sequence([0, 2, 4, 6, 8])

Reduction operations don’t actually have to ‘reduce’ anything. It can also expand the sequence. For example, in instance we’re constructing a new sequence where each element is included twice, therefore, the resulting sequence is twice the original sequence.

>>> Sequence(range(10)).reduce(lambda t, s: t + [s, s], [])
Sequence([0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9])
dfp.add(record, key, value)

Add a field to a record in a non-destructive way.

Parameters:
  • record (Tuple[tuple]) – The record to add a field to.

  • key (str) – The name of the new field.

  • value (Any) – The value of the new field.

Returns:

a new record with the field added.

>>> record = (('a', 1), ('b', 2))
>>> add(record, 'c', 3)
(('a', 1), ('b', 2), ('c', 3))
>>> record
(('a', 1), ('b', 2))

If the key already exists, another tuple is added, therefore preserving the history of changes:

>>> add(record, 'a', 2)
(('a', 1), ('b', 2), ('a', 2))

When used in conjunction with pluck_item/pluck_list, one needn’t worry about the duplicated keys however, as the ‘latest’ value will be returned.

>>> pluck_item('a', add(record, 'a', 2))
2
dfp.alloc(value, length) List[Any]

Create a list with values of specified length

Parameters:
  • value (Any) – The value of every value in the new list.

  • length (int) – The length of the allocated list.

>>> alloc(0, 5)
[0, 0, 0, 0, 0]
>>> alloc(["?"], 5)
[['?'], ['?'], ['?'], ['?'], ['?']]
>>> alloc(alloc(0, 5), 5)
[[0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0]]
dfp.but_last(lst)

Get all elements except the last one

dfp.compose(*funs)

Function composition e.g. g(f(x))

dfp.dataclass_to_record(dc)

Convert a dataclass to a record

dfp.dataframe_to_records(df) tuple

Convert a dataframe to records

dfp.filesystem_leaves(path: str) List[str]

Recursively find all files from path

dfp.find(token, inlist) int | None

Return the first index where token is found in inlist. If the token is not found, None is returned.

dfp.first(lst)

Return the first element of an iterable.

Parameters:

lst – The iterable to index into.

Returns:

The first element.

>>> from src.dfp import first
>>> first([0, 1, 2])
0
dfp.first_rest(lst) tuple

Deconstruct the first and rest of an iterable in one statement.

Parameters:

lst – The iterable to deconstruct.

Returns:

A tuple with the first element being the first element of the list and the second element is the rest of the iterable.

>>> from src.dfp import first_rest
>>> first_rest([0, 1, 2])
(0, [1, 2])

This makes it more simple when assigning to variables than passing the same iterable to both first and rest functions.

>>> my_list = [0, 1, 2]
>>> f, r = first_rest(my_list)
>>> f
0
>>> r
[1, 2]
dfp.flatten(lst)

Flatten a nested list into a single list

dfp.flatten_dict(dct: dict, key_join_fn: ~typing.Callable[[~typing.Any, ~typing.Any], ~typing.Any] = <function <lambda>>) dict

Flatten a nested dictionary.

Flatten a nested dictionary by joining the keys as defined by key_join_fn.

>>> from dfp import flatten_dict
>>> nested_dict = {"my-key": {"foo": {"bar": {"baz": "nested-value"}}}}
>>> flattened_dict = flatten_dict(nested_dict)
>>> flattened_dict
{"my-key.foo.bar.baz": "nested-value"}

By default, key_join_fn joins the nested keys by a . symbol. Though this behaviour can be user defined by passing a joining function that takes two keys as arguments.

>>> flattened_dict = flatten_dict(
        nested_dict,
        key_join_fn=lambda key1, key2: f"{key1}-{key2}")
>>> flattened_dict
{"my-key-foo-bar-baz": "nested-value"}

You could even only use the most nested key:

>>> flatten_dict(nested_dict, key_join_fn=lambda k1, k2: k2)
{"baz": "nested-value"
Parameters:
  • dct (dict) – The nested dictionary to flatten.

  • key_join_fn (Callable) – The function to join two keys together.

Returns:

A flattened dictionary with keys joined by key_join_fn.

dfp.for_each(*args, **kwargs)

Apply f to each element of lst

for_each abstracts a simple for loop where a function f is applied to each element of lst. for_each doesn’t return anything but f can be used to add the result of f to a list within the scope of the caller.

Parameters:
  • f (Callable) – The function to call/apply to each element of lst.

  • lst (Iterable) – The iterable list/tuple/ etc that f should be applied to.

Examples

FIXME: Add docs.

dfp.group_by(key, iterable)

Group records by key

dfp.has_props(record, where) bool

Check if record has a value with key

dfp.identity(x)

The identity function, return the input.

Parameters:

x (Any) – The value to return.

Returns:

The input.

dfp.inverse(fun)

Inverse a boolean function

Parameters

funCallable

The function to inverse the result of

Returns

Callable

A new function that provides the inverse

Examples

>>> inverse(lambda: False)()
True
>>> inverse(lambda x: x>0)(1)
False
dfp.itemise(lst, idx_name: str = 'idx', val_name: str = 'val')

Named-tuple enumeration

dfp.itemize(lst, idx_name: str = 'idx', val_name: str = 'val')

Named-tuple enumeration

dfp.join(left, right, by: str | List[str], how: str = 'inner')

Join records

dfp.join_inner(left, right, by)

Join records by inner

dfp.join_left(left, right, by)

Join records by left

dfp.join_paths(*args) str

Join paths together.

Parameters:

*args

All the paths you wish to join together.

Returns:

A string representation of the joined path.

>>> from src.dfp import join_paths
>>> join_paths("/path/to", "something")
'/path/to/something'

The every argument, except the last is assumed to be a folder. For example:

>>> join_paths("path/to", "something", "else")
'path/to/something/else'

This function also works with pathlib.Path values, but always returns a str.

>>> from pathlib import Path
>>> join_paths(Path("/path/"), "//to/something")
'/path/to/something'

Here we also see the advantage of using join_paths where the resulting path is always clean, in that it doesn’t have duplicate ‘/’ values.

dfp.join_right(left, right, by)

Join records by right

dfp.keys(record: Dict[str, Any] | Tuple[Tuple[str, Any]]) Tuple[str]

Return all the keys available in the record

Parameters:

record – The record to return keys from.

Returns:

The keys.

>>> from src.dfp import keys
>>> record = (('a', 1), ('b', 2))
>>> keys(record)
('a', 'b')

Keys will only return the unique set of keys. Therefore, if you’ve added multiple keys of the same name, these ‘duplicate’ keys will only appear once.

>>> record = (('a', 1), ('a', 2))
>>> keys(record)
('a',)
dfp.label_record(labels, record)

Label a record

dfp.label_records(labels, records)

Label many records

dfp.last(lst)

Get the last element of an iterable

Parameters:

lst (Iterable) – The iterable to index over.

Returns:

The last item of the iterable.

dfp.lfilter(*args, **kwargs)

Eagerly filter a list returning a list of elements where f(x) is True.

Parameters:
  • f (Callable) – The function to apply to each element to test if should be included in the list.

  • lst (Iterable) – The iterable to filter.

Returns:

A list of elements where f(x) is True

>>> from src.dfp import lfilter
>>> lfilter(lambda x: x % 2 == 0, range(10))
[0, 2, 4, 6, 8]
dfp.lmap(*args, **kwargs)

Apply function f to each element of lst. Return the results as a typle.

Parameters:
  • f (Callable) – The function to apply to each element.

  • lst (Iterable) – The iterable of elements to apply the function to.

  • parallel (boolean) – Boolean (false by default) flag that specifies if the function is applied in parallel using multiple threads. For very short functions/iterables this is slower. However, if your function is dependent on io, then you can get very fast speed-ups with this. Results are still in order.

  • p_workers (int) – The number of workers to run in parallel.

  • progress_fn (Callable) – The function to create a progress bar.

  • p_type (str) – The type of multi-processing to use (i.e. thread or process).

Returns:

A list with f applied to each element of lst.

>>> from dfp import lmap
>>> lmap(lambda x: x*2, range(5))
[0, 2, 4, 6, 8]

See tmap for more examples.

dfp.lreduce(*args, **kwargs)

Eagerly apply a reduce operation, returning a list if the result is not a singleton.

Parameters:
  • f – The reduction function f(x, y) -> z, e.g. lambda sum, element: sum + element

  • lst – The iterable to reduce.

  • init – The initial value before applying f for the first time.

Returns:

A singleton element or a list of elements as defined by how f reduces the iterable.

>>> from src.dfp import lreduce
>>> lreduce(lambda sum, element: sum + element, range(10), 0)
45

Reduce operations are very general in that its possible to re-implement map and filter methods.

>>> lreduce(lambda lst, el: lst + [el] if el%2==0 else lst, range(10), [])
[0, 2, 4, 6, 8]
dfp.lzip(*lst)

Eagerly zip iterables returning a list.

Parameters:

lst (Iterable) – The iterables to zip together.

Returns:

List of zipped elements.

>>> from src.dfp import lzip
>>> lzip(['a', 'b'], [1, 2])
[('a', 1), ('b', 2)]
dfp.member(token, in_list)

Returns the rest of a iterable for which token is found. For example,

>>> member('s', ['a', 's', 'b'])
['s', 'b']

If the token is not found, an empty iterable is returned

dfp.merge_dicts(*dicts) dict

Merge many dictionaries into one dictionary.

Merge many dictionaries into one dictionary, this works better when all dictionaries have the same keys.

The values of the dictionaries will be accumulated into a list:

>>> from dfp import merge_dicts
>>> dict1, dict2 = {'my-key': 1}, {'my-key': 2}
>>> merged_dicts = merge_dicts(dict1, dict2)
>>> merged_dicts
{'my-key': [1, 2]}

If the keys don’t match between dictionaries then the lengths of the accumulated lists won’t match. This may be okay for your use-case, but just know we don’t check for this.

>>> dict1, dict2 = {'my-key-1': 1}, {'my-key-1': 2, 'my-key-2': 1}
>>> merged_dicts = merge_dicts(dict1, dict2)
>>> merged_dits
{'my-key-1': [1, 2], 'my-key-2': [1]}
Parameters:

dicts – The dictionaries to be merged.

Returns:

A single dictionary with the values from each dictionary being accumulated into a list.

dfp.none_fn(*args, **kwargs)

Always return None

dfp.nth(*args, **kwargs)

Return the n-th element (using 0-based indexing) of an iterable.

Parameters:
  • lst (Iterable) – The iterable to retrieve the element from.

  • n (int) – The index of the element to retrieve.

Returns:

The n-th element of lst.

>>> nth(['a', 'b', 'c'], 2)
'c'
dfp.orderedset(x)

Ordered set

dfp.picknmix(*iterables)

Take elements from iterables one at a time

Parameters:

iterables (Iterable[Iterable]) – The iterables to sample from.

Returns:

A tuple of successive elements from each iterable in turn.

>>> picknmix([0, 2], [1, 3])
(0, 1, 2, 3)

picknmix returns successive elements to the smallest iterable: >>> picknmix([0, 2], [1]) [0, 1] >>> picknmix((0, 2), (1, 3)) (0, 1, 2, 3)

dfp.pipe(*args)

Pipe data through functions

dfp.pluck_first(name: str, iterable)

Pluck a name from many records and return the first

dfp.pluck_item(name: str | List[str], iterable: Dict[str, Any] | Tuple[Tuple[str, Any]]) Any

Get value corresponding to the key name of a record.

Parameters:
  • name (str) – The name of the key to retrieve the value from.

  • iterable (Record or dataclass.) – The record or class to retrieve from.

Returns:

The value corresponding with the found key. If the key is not found then None.

>>> record = (('a', 2), ('b', 1))
>>> pluck_item('a', record)
2

Pluck item will return the last value if there are duplicate keys. This is to allow the tracking of historical changes. Take for example:

>>> new_record = add(record, "b", 3)
>>> new_record
(('a', 2), ('b', 1), ('b', 3))

We see that new_record contains two duplicate keys, the latter of which is the new one we just added. When we use pluck_item, the most recent key will be used. Therefore, one can view the add function as also an update method.

>>> pluck_item('b', new_record)
3

The name to pluck from the iterable can also be an iterable within itself. That means you can provide nested keys to pluck.

>>> new_record = (('a', ('b', 3),)), ('b', 1))
>>> pluck_item(['a', 'b'], new_record)
3
dfp.pluck_items(names: List[str], iterable: Dict[str, Any] | Tuple[Tuple[str, Any]]) Tuple[Any]

Pluck many items for a single record

dfp.pluck_list(name: str, iterable: Dict[str, Any] | Tuple[Tuple[str, Any]]) Tuple[Any]

Pluck an item from many records

dfp.port(filename: str | Path, fn: Callable, read_mode: str = 'r') str

Read and write to a stream

dfp.port_csv(filename: str | Path, content: list[str] | None = None) str | list[str]

Read and write to a CSV format into a stream.

dfp.port_json(filename: str | Path, content: dict | None = None) str | dict

Read and write to a json format.

dfp.port_lines(filename: str | Path, content: list[str] | None = None) str | list[str]

Read and write lines to a stream

dfp.port_pickle(filename: str | Path, content: Any | None = None) str | Any

Read and write a pickle format.

dfp.printr(x, fun=<function <lambda>>)

Debugging statement for piping

dfp.record_to_dataclass(dc_type, record)

Convert a record to a dataclass

dfp.records_to_dataframe(records) pandas.DataFrame

Convert records to dataframe. (WARNING) assumes all records has the same keys

dfp.remove(record, key)

Remove a field from a record in a non-destructive way.

Parameters:
  • record (Tuple[tuple]) – The record to remove a field from.

  • key (str) – The name of the field to remove.

Returns:

A new record with the field removed.

>>> record = (('a', 1), ('b', 2), ('c', 3))
>>> remove(record, 'c')
(('a', 1), ('b', 2))
dfp.rest(lst)

Return the iterable excluding the first element.

Parameters:

lst – The iterable to index into.

Returns:

The iterable without the first element.

>>> from src.dfp import rest
>>> rest([0, 1, 2])
[1, 2]
dfp.second(lst)

Return the second element of an iterable.

Parameters:

lst – The iterable to index into.

Returns:

The second element of the iterable

>>> from src.dfp import second
>>> second([0, 1, 2])
1
dfp.slice(lst, start=None, stop=None, step=1)

Slice an iterable by a start/stop/step index.

Parameters:
  • lst – The iterable to index into.

  • start – The start index, default is the start of the iterable.

  • stop – The end index, default is the end of the iterable.

  • step – How many indexes to step by, default is 1.

>>> from src.dfp import slice
>>> slice(list(range(10)), 1)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> slice(list(range(10)), stop=5)
[0, 1, 2, 3, 4]
>>> slice(list(range(10)), start=2, stop=9, step=2)
[2, 4, 6, 8]
dfp.spread(records, key_col, val_col)

Spread a table

dfp.take(lst, n)

Take a number of elements from a sequence

dfp.take_subset(lst: Iterable, indexes: Iterable, bools: bool = False) tuple

Take indexes from lst.

Take elements from lst using indexes to denote which elements to take. This is equivalent to [lst[idx] for idx in indexes], where the result a subset of lst.

Parameters:
  • lst (Iterable) – The iterable to take a subset from.

  • indexes (Iterable) – An iterable of indexes.

  • bools (bool) – Optional argument to specify if indexes consists of a list of indexes (bools=False) or if indexes is a list of boolean values where True denotes that element x_i should be included in the subset.

>>> from src.dfp import take_subset
>>> lst = ['a', 'b', 'c']
>>> take_subset(lst, [0, 1])
('a', 'b')

From this example, we create a list named a, with the elements ‘a’, ‘b’, and ‘c’. Then we take a subset of this list using the indexes 0, and 1.

We can also specify boolean values as indexes. For this the size of lst and bools must be the same size. If a boolean value at index i is True, then element at index i in lst will be returned in the subset. For example:

>>> indexes = [True, False, True]
>>> assert len(indexes) == len(lst)
>>> take_subset(lst, indexes, bools=True)
('a', 'c')

Returns

tuple

A tuple representing the subset of lst.

dfp.tfilter(*args, **kwargs)

Eagerly filter a list returning a tuple of elements where f(x) returns True.

Parameters:
  • f (Callable) – The function that returns True/False, True the element is included in the result.

  • lst (Iterable) – The iterable to filter.

Returns:

Tuple of elements where f(x) is True

>>> from src.dfp import tfilter
>>> tfilter(lambda x: x % 2 == 0, range(10))
(0, 2, 4, 6, 8)
dfp.thread(*args)

Pipe data in a list style formatting

dfp.tmap(*args, **kwargs)

Apply function f to each element of lst. Return the results as a tuple.

Parameters:
  • f (Callable) – The function to apply to each element.

  • lst (Iterable) – The iterable of elements to apply the function to.

  • parallel (boolean) – Boolean (false by default) flag that specifies if the function is applied in parallel using multiple threads. For very short functions/iterables this is slower. However, if your function is dependent on io, then you can get very fast speed-ups with this. Results are still in order.

  • p_workers (int) – The number of workers to run in parallel.

  • progress_fn (Callable) – The function to create a progress bar.

  • p_type (str) – The type of multi-processing to use (i.e. thread or process).

Returns:

A tuple with f applied to each element of lst.

Basic Example

>>> from dfp import tmap
>>> tmap(lambda x: x*2, range(5))
(0, 2, 4, 6, 8)
>>> tmap(lambda x: x*2, range(5), parallel=True)
(0, 2, 4, 6, 8)

Progress Bars

With tmap, the user can enable a progress par by using the argument progress=True.

>>> tmap(lambda x: x*2, range(5), progress=True)

This progress bar will also handle asynchronous operations, and therefore it is perfectly legal to use both progress and parallel at the same time. Note: due to the un-predictability of asynchronous function calls, the estimated time to complete reported by the progress bar will not be exactly accurate.

If you want to customise the progress bar, you can pass a progress bar to the progress_fn argument. This also allows you to use a progress bar other than tqdm. This argument expects a callable function with a single argument – the data to iterate over. Typical usage of this argument is with partial:

>>> from tqdm.auto import tqdm
>>> tmap(lambda x: x * 2, range(5), progress=True,
         progress_fn=lambda lst: tqdm(lst, desc="Multiplying numbers by 2"))

When this progress bar appears, it will have the correct description.

Transducers

When working with pipes and transforming a collection of data, we often apply a transformation and pass the result to the next transformation. For example:

>>> from dfp import pipe
>>> pipe(
    range(10),
    lambda lst: tfilter(lambda x: x%2==0, lst),
    lambda lst: tmap(lambda x: x*2, lst))
(0, 4, 8, 12, 16)

Notice that we have to create an anonymous function that takes the result of the previous transformation. This can become a little annoying, so we allow the lst to be optional. If it is not supplied, i.e. None, then tmap returns a transducer, a map that takes one argument (the list). This makes composition with transformation pipes much easier.

>>> pipe(
    range(10),
    tfilter(lambda x: x%2==0, lst),
    tmap(lambda x: x*2, lst))
dfp.trace(f, before: Callable | None = None, after: Callable | None = None)

Trace the input and output of a function by wrapping it with trace.

dfp.treduce(*args, **kwargs)

Eagerly apply a reduce operation, returning a tuple if the result is not a singleton.

Parameters:
  • f – The reduction function f(x, y) -> z, e.g. lambda sum, element: sum + element

  • lst – The iterable to reduce.

  • init – The initial value before applying f for the first time.

Returns:

A singleton element or a tuple of elements as defined by how f reduces the iterable.

>>> from src.dfp import treduce
>>> treduce(lambda sum, element: sum + element, range(10), 0)
45

Reduce operations are very general in that its possible to reimplement map and filter methods.

>>> treduce(lambda lst, el: lst + [el] if el%2==0 else lst, range(10), [])
(0, 2, 4, 6, 8)
dfp.tzip(*lst) tuple

Eagerly zip iterables returning a tuple.

Parameters:

lst (Iterable) – The iterables to zip together.

Returns:

Tuple of zipped elements.

>>> from src.dfp import tzip
>>> tzip(['a', 'b'], [1, 2])
(('a', 1), ('b', 2))
dfp.unique(lst, key: str | None = None, how=<function <lambda>>)

Return the unique elements using key as the key to determine unique elements. If multiple elements with the same value exist, the latter in the sequence will be returned.