threadx - Create elegant data transformation pipelines. It lets you thread values through a sequence of operations with a sense of clarity and simplicity that feels natural. And it all revolves around two key elements:
- thread: Passes the result of each step as the input to the next.
- x: A smart placeholder that knows exactly where to inject the previous result, whether in a method call, item lookup, or even unpacking.
Here’s what it looks like in action:
from threadx import thread, x
thread('./data.log',
read_file,
x.splitlines,
(map, x.strip, x),
(map, json.loads, x),
(map, x['time'], x),
sum)
What’s happening here? The file content is being read, split, stripped, converted to JSON, and the execution-time summed—all in a linear and readable way. No intermediary variables, no nesting, just the data flowing from one step to the next.
The data.log
file (generated by inspector) contains entries like this:
{"time": 12000, "fn": "foo", ...}
{"time": 12345, "fn": "bar", ...}
What Makes threadx Interesting?
- Readable Flow: Instead of diving into layers of nested calls, you write each transformation as a clear, sequential step.
- The
x
Factor:x
acts as a placeholder for where the output of the previous step goes. It’s surprisingly flexible, supporting method calls, attribute/item lookups, and more. - No Extra Variables: Avoid the noise of intermediate variables or lambda functions. Your transformations stay clean and minimal.
pip install threadx
from threadx import thread, x, stop
thread
allows you to pass the result of the previous step automatically as the first argument in each new function:
thread([1, 2, 3], # => [1, 2, 3]
sum, # => 6
str) # => '6'
Or, be explicit about it:
thread([1, 2, 3],
(sum, x),
(str, x))
Want to pass the result into a different argument position? No problem:
thread(10,
(range, x, 20, 3), # same as (range, 20, 3)
list) # => [10, 13, 16, 19]
thread(20,
(range, 10, x, 3),
list) # => [10, 13, 16, 19]
thread(3,
(range, 10, 20, x),
list) # => [10, 13, 16, 19]
Unpacking works as usual
thread([10, 20],
(range, *x, 3), # unpack to (range, 10, 20, 3)
list) # => [10, 13, 16, 19]
Use x.method_name
for method calls, just like magic.
thread(['a', 'b'],
(x.index, 'a')) # => 0
thread(['a', 'b'],
(x.count, 'b')) # => 1
Use x.attribute_name
to lookup class and instance attributes.
thread({'a': 1, 'b': 2},
x.keys,
list) # => ['a', 'b']
data = {'a': {'b': [1, 2, 3, 4]}}
thread(data,
x['a'],
x['b'][0]) # => 1
thread(data,
x['a']['b'][:2]) # => [1, 2]
Easily inspect intermediate results using stop
. Usefull for debugging.
thread(data,
x['a'],
x['b'],
stop, # => [1, 2, 3, 4], Stop and return for inspection
sum, # This won’t be executed
str)
Remove verbose lambdas in simple cases.
data = [[1, 2, 3, 4], [10, 20, 30, 40]]
# Normal way:
thread(data,
(map, lambda i: i[0], x),
list) # => [1, 10]
# or
thread(data,
(map, x[0], x),
list) # => [1, 10]
# Normal way:
thread(range(12),
(filter, lambda i: i % 2 == 0, x),
list) # => [0, 2, 4, 6, 8, 10]
# or
thread(range(12),
(filter, x % 2 == 0, x),
list) # => [0, 2, 4, 6, 8, 10]
# make a tuple or list
pipeline = (read_file,
x.splitlines,
(map, x.strip, x),
(map, json.loads, x),
(map, x['time'], x),
sum)
thread('./data.log', *pipeline) # works jsut like any other function.
After spending a few years working with Clojure, I found myself missing its threading macros when I returned to Python (for a side project). Sure, Python has some tools for chaining operations, but nothing quite as elegant or powerful as what I was used to.