From b15b7ad194a3053433634dd4d6614e5ff96cc24b Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 11 Sep 2024 11:03:05 -0400 Subject: [PATCH 1/2] Alternative merge algorithm without the builder --- src/akimbo/io.py | 46 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/src/akimbo/io.py b/src/akimbo/io.py index 8fa36a3..30fb10d 100644 --- a/src/akimbo/io.py +++ b/src/akimbo/io.py @@ -2,6 +2,7 @@ import awkward as ak import fsspec +import numpy as np def ak_to_series(ds, backend="pandas", extract=True): @@ -223,10 +224,45 @@ def join( merge = _jitted[0] else: merge = _merge - builder = ak.ArrayBuilder() - merge(table1[key], table2[key], builder) - merge_index = builder.snapshot() - indexed = table2[ak.flatten(merge_index)] - counts = ak.num(merge_index) + # builder = ak.ArrayBuilder() + # merge(table1[key], table2[key], builder) + # merge_index = builder.snapshot() + # indexed = table2[ak.flatten(merge_index)] + # counts = ak.num(merge_index) + + counts, matches = merge(table1[key], table2[key]) + indexed = table2[matches] listy = ak.unflatten(indexed, counts) return ak.with_field(table1, listy, colname) + + +def _merge(ind1, ind2): + len2 = len(ind2) + counts = np.empty(len(ind1), dtype="uint32") + matches = np.empty(len2, dtype="uint64") + j = 0 + offind = 0 + matchind = 0 + last = 0 + for i in ind1: + while True: + if j >= len2: + break + if i > ind2[j]: + # ID not yet found + j += 1 + continue + if i < ind2[j]: + # no more entrie + break + # hit + while True: + matches[matchind] = j + j += 1 + matchind += 1 + if j >= len2 or i != ind2[j]: + break + counts[offind] = matchind - last + last = matchind + offind += 1 + return counts, matches From 7a6acba8d26c8711c72d99fcb893c8f6cdc85bdb Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 11 Sep 2024 14:35:02 -0400 Subject: [PATCH 2/2] resize matches --- src/akimbo/io.py | 44 +++++++++----------------------------------- src/akimbo/mixin.py | 4 +++- 2 files changed, 12 insertions(+), 36 deletions(-) diff --git a/src/akimbo/io.py b/src/akimbo/io.py index 30fb10d..c264722 100644 --- a/src/akimbo/io.py +++ b/src/akimbo/io.py @@ -160,31 +160,6 @@ def get_avro_schema( return form -def _merge(ind1, ind2, builder): - """numba jittable left join/merge index finder""" - len2 = len(ind2) - j = 0 - for i in ind1: - builder.begin_list() - while True: - if j >= len2: - break - if i > ind2[j]: - # ID not yet found - j += 1 - continue - if i < ind2[j]: - # no more entrie - break - # hit - while True: - builder.append(j) - j += 1 - if j >= len2 or i != ind2[j]: - break - builder.end_list() - - _jitted = [None] @@ -224,22 +199,21 @@ def join( merge = _jitted[0] else: merge = _merge - # builder = ak.ArrayBuilder() - # merge(table1[key], table2[key], builder) - # merge_index = builder.snapshot() - # indexed = table2[ak.flatten(merge_index)] - # counts = ak.num(merge_index) - counts, matches = merge(table1[key], table2[key]) + counts = np.empty(len(table1), dtype="uint64") + # TODO: the line below over-allocates, can swithch to somehing growable + matches = np.empty(len(table2), dtype="uint64") + # TODO: to_numpy(allow_missong) makes this a bit faster, but is not + # not GPU general + counts, matches, ind = merge(table1[key], table2[key], counts, matches) + matches.resize(int(ind), refcheck=False) indexed = table2[matches] listy = ak.unflatten(indexed, counts) return ak.with_field(table1, listy, colname) -def _merge(ind1, ind2): +def _merge(ind1, ind2, counts, matches): len2 = len(ind2) - counts = np.empty(len(ind1), dtype="uint32") - matches = np.empty(len2, dtype="uint64") j = 0 offind = 0 matchind = 0 @@ -265,4 +239,4 @@ def _merge(ind1, ind2): counts[offind] = matchind - last last = matchind offind += 1 - return counts, matches + return counts, matches, matchind diff --git a/src/akimbo/mixin.py b/src/akimbo/mixin.py index a6d1296..82faf73 100644 --- a/src/akimbo/mixin.py +++ b/src/akimbo/mixin.py @@ -238,7 +238,9 @@ def to_arrow(cls, data): @property def array(self) -> ak.Array: """Data as an awkward array""" - return ak.with_name(ak.from_arrow(self.arrow), self._behavior) + if self._behavior: + return ak.with_name(ak.from_arrow(self.arrow), self._behavior) + return ak.from_arrow(self.arrow) @classmethod def register_accessor(cls, name, klass):