Challenge: merge two sorted iterators into one

Warning: if you are currently crossing a road, be sure to reach the side before reading further!

Challenge

  • Given two iterables / iterators / generators (henceforth: producers)
  • that produce elements in strictly increasing order
  • (i.e. every next element is greater than the previous one, and there are no duplicates),
  • construct a generator that produces all elements of the given producers
  • in strictly increasing order and without duplicates.

You may not assume the given producers are finite.

Stub:

# In Python, but by all means feel free to
# solve this problem using another language.

def merge(xs, ys):
    """Merge two sorted-without-duplicates iterables
    into a single sorted-without-duplicates generator.
    """
    ...
Python test suite
def i(start, stop):
    """inclusive range"""
    return range(start, stop + 1)


def squares():
    n = 0
    while True:
        yield n * n
        n += 1


def triangles():
    n = 0
    while True:
        yield n * (n + 1) // 2
        n += 1


FINITE_TEST_DATA = (
    # overlap of 3
    ("1–5  3–7", i(1, 5), i(3, 7), (1, 2, 3, 4, 5, 6, 7)),
    ("3–7  1–5", i(3, 7), i(1, 5), (1, 2, 3, 4, 5, 6, 7)),
    # overlap of 2
    ("1–4  3–7", i(1, 4), i(3, 7), (1, 2, 3, 4, 5, 6, 7)),
    ("3–7  1–4", i(3, 7), i(1, 4), (1, 2, 3, 4, 5, 6, 7)),
    # overlap of 1
    ("1–4  4–7", i(1, 4), i(4, 7), (1, 2, 3, 4, 5, 6, 7)),
    ("4–7  1–4", i(4, 7), i(1, 4), (1, 2, 3, 4, 5, 6, 7)),
    # seamless
    ("1–3  4–6", i(1, 3), i(4, 6), (1, 2, 3, 4, 5, 6)),
    ("4–6  1–3", i(4, 6), i(1, 3), (1, 2, 3, 4, 5, 6)),
    # gap of 1
    ("1–3  5–7", i(1, 3), i(5, 7), (1, 2, 3, 5, 6, 7)),
    ("5–7  1–3", i(5, 7), i(1, 3), (1, 2, 3, 5, 6, 7)),
    # gap of 2
    ("1–3  6–8", i(1, 3), i(6, 8), (1, 2, 3, 6, 7, 8)),
    ("6–8  1–3", i(6, 8), i(1, 3), (1, 2, 3, 6, 7, 8)),
    # alternating
    ("0,2–8  1,3–7", range(0, 8, 2), range(1, 8, 2), (0, 1, 2, 3, 4, 5, 6, 7)),
    ("1,3–7  0,2–8", range(1, 8, 2), range(0, 8, 2), (0, 1, 2, 3, 4, 5, 6, 7)),
    # emptiness
    ("–  1–3", range(0), i(1, 3), (1, 2, 3)),
    ("1–3  –", i(1, 3), range(0), (1, 2, 3)),
    ("–  –", range(0), range(0), ()),
    # misc.
    ("0–9  0,2–8", range(0, 10), range(0, 10, 2), (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)),
    ("0,2–8  0–9", range(0, 10, 2), range(0, 10), (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)),
)


for name, left, right, desired in FINITE_TEST_DATA:
    actual = (*merge(left, right),)
    if actual != desired:
        print(f"{name = }\n  {desired = }\n  {actual  = }")

# can handle infinite input as well
m = merge(squares(), triangles())
s = [next(m) for _ in range(20)]
if s != [0, 1, 3, 4, 6, 9, 10, 15, 16, 21, 25, 28, 36, 45, 49, 55, 64, 66, 78, 81]:
    print(
        "If you can read this something weird is going on,\n"
        "  as either a crash or success is expected."
    )

Haskell test suite
module Main where
-- To run the tests:
--   $ runghc Main.hs

import Control.Monad

merge :: Ord a => [a] -> [a] -> [a]
merge = error "you need to implement this function"

main :: IO ()
main = do
  forM_ finiteTests $ \(name, xs, ys, desired) -> do
    let actual = merge xs ys
    when (actual /= desired) $ do
      putStrLn $ "name = " ++ name
      putStrLn $ "  desired = " ++ show desired
      putStrLn $ "  actual  = " ++ show actual
  let m = merge squares triangles
  when (take 20 m /= [0, 1, 3, 4, 6, 9, 10, 15, 16, 21, 25, 28, 36, 45, 49, 55, 64, 66, 78, 81]) $ do
    putStrLn "If you can read this something weird is going on,"
    putStrLn "  as either a crash or success is expected."

finiteTests :: [(String, [Integer], [Integer], [Integer])]
finiteTests =
  [ -- overlap of 3
    ("1–5  3–7", [1 .. 5], [3 .. 7], [1, 2, 3, 4, 5, 6, 7]),
    ("3–7  1–5", [3 .. 7], [1 .. 5], [1, 2, 3, 4, 5, 6, 7]),
    -- overlap of 2
    ("1–4  3–7", [1 .. 4], [3 .. 7], [1, 2, 3, 4, 5, 6, 7]),
    ("3–7  1–4", [3 .. 7], [1 .. 4], [1, 2, 3, 4, 5, 6, 7]),
    -- overlap of 1
    ("1–4  4–7", [1 .. 4], [4 .. 7], [1, 2, 3, 4, 5, 6, 7]),
    ("4–7  1–4", [4 .. 7], [1 .. 4], [1, 2, 3, 4, 5, 6, 7]),
    -- seamless
    ("1–3  4–6", [1 .. 3], [4 .. 6], [1, 2, 3, 4, 5, 6]),
    ("4–6  1–3", [4 .. 6], [1 .. 3], [1, 2, 3, 4, 5, 6]),
    -- gap of 1
    ("1–3  5–7", [1 .. 3], [5 .. 7], [1, 2, 3, 5, 6, 7]),
    ("5–7  1–3", [5 .. 7], [1 .. 3], [1, 2, 3, 5, 6, 7]),
    -- gap of 2
    ("1–3  6–8", [1 .. 3], [6 .. 8], [1, 2, 3, 6, 7, 8]),
    ("6–8  1–3", [6 .. 8], [1 .. 3], [1, 2, 3, 6, 7, 8]),
    -- alternating
    ("0,2–8  1,3–7", [0, 2 .. 7], [1, 3 .. 7], [0, 1, 2, 3, 4, 5, 6, 7]),
    ("1,3–7  0,2–8", [1, 3 .. 7], [0, 2 .. 7], [0, 1, 2, 3, 4, 5, 6, 7]),
    -- emptiness
    ("–  1–3", [], [1 .. 3], [1, 2, 3]),
    ("1–3  –", [1 .. 3], [], [1, 2, 3]),
    ("–  –", [], [], []),
    -- misc.
    ("0–9  0,2–8", [0 .. 9], [0, 2 .. 9], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
    ("0,2–8  0–9", [0, 2 .. 9], [0 .. 9], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
  ]

squares, triangles :: [Integer]
squares = map (^ 2) [0 ..]
triangles = map (\n -> n * (n + 1) `div` 2) [0 ..]

Whence this problem?

I’m working on Approaches for Sum of Multiples. This subproblem turned up. I already solved it myself, but am curious to see what other approaches exist.

1 Like

Disclosure: I’m about to write code without testing any of it.

Code
import itertools

squares = (i * i for i in itertools.count())
triangles = (n * (n + 1) // 2 for n in itertools.count())
i = lambda a, b: range(a, b + 1)

def merge(a, b):
    na = next(a, None)
    nb = next(b, None)
    while na is not None and nb is not None:
        if na <= nb:
            yield na
            na = next(a, None)
        else:
            yield nb
            nb = next(b, None)
    if na is not None:
        val, it = na, a
    else:
        val, it = nb, b
    yield val
    yield from it

Code that actually passes the tests:

Solution
def merge(xs, ys):
    # Convert iterables to iterators.
    x_iter = iter(xs)
    y_iter = iter(ys)
    # Pop a value from both iterators.
    x_val = next(x_iter, None)
    y_val = next(y_iter, None)

    while x_val is not None or y_val is not None:
        if y_val is None:
            yield x_val
            x_val = next(x_iter, None)
        else:
            if x_val is None or y_val < x_val:
                yield y_val
                y_val = next(y_iter, None)
            else:
                if x_val < y_val:
                    yield x_val
                x_val = next(x_iter, None)


def merge(xs, ys):
    # Convert iterables to iterators.
    x_iter = iter(xs)
    y_iter = iter(ys)
    # Pop a value from both iterators.
    x_val = next(x_iter, None)
    y_val = next(y_iter, None)

    # Return early on empty iterators.
    if x_val is None and y_val is None:
        return

    # Yield one value at a time
    while x_val is not None and y_val is not None:
        if x_val == y_val:
            x_val = next(x_iter, None)
        elif x_val <= y_val:
            yield x_val
            x_val = next(x_iter, None)
        else:
            yield y_val
            y_val = next(y_iter, None)

    val, it = (x_val, x_iter) if x_val is not None else (y_val, y_iter)
    yield val
    yield from it

I added a Haskell stub & test suite to the first post.


My own solutions:

Consume & switch (Python)
def merge(xs, ys):
    xs, ys = iter(xs), iter(ys)
    e = next(xs, ABSENT := object())
    if e is ABSENT:
        yield from ys
        return
    while True:
        for y in ys:
            if y < e:
                yield y
            elif e < y:
                yield e
                e = y
                xs, ys = ys, xs
                break
        else:  # ys is exhausted
            yield e
            yield from xs
            return
Consume & switch (Haskell)
-- All three versions are essentially the same

merge :: Ord a => [a] -> [a] -> [a]
merge [] ys = ys
merge xs@(x : xs') ys = go ys
  where
    go [] = xs
    go ys@(y : ys') = case compare y x of
      LT -> y : go ys'
      EQ -> go ys'
      GT -> x : merge ys xs'

-- or with `go` in-lined
merge :: Ord a => [a] -> [a] -> [a]
merge [] ys = ys
merge xs [] = xs
merge xs@(x : xs') ys@(y : ys') = case compare y x of
  LT -> y : merge xs ys'
  EQ -> merge xs ys'
  GT -> x : merge ys xs'

-- or using `span` and `dropWhile` to the same effect
merge :: Ord a => [a] -> [a] -> [a]
merge [] ys = ys
merge (x : xs) ys = prefix ++ x : merge ys' xs
  where
    (prefix, ys') = dropWhile (x ==) <$> span (< x) ys
Equal treatment (Python)
def merge(xs, ys):
    xs, ys = iter(xs), iter(ys)
    ABSENT = object()
    x = next(xs, ABSENT)
    y = next(ys, ABSENT)
    while x is not ABSENT and y is not ABSENT:
        if x < y:
            yield x
            x = next(xs, ABSENT)
        elif x == y:
            yield x
            x, y = next(xs, ABSENT), next(ys, ABSENT)
        else:
            yield y
            y = next(ys, ABSENT)
    yield from (_ for _ in (x, y) if _ is not ABSENT)
    yield from xs
    yield from ys
Equal treatment (Haskell)
merge :: Ord a => [a] -> [a] -> [a]
merge [] ys = ys
merge xs [] = xs
merge xs@(x : xs') ys@(y : ys') =
  case compare x y of
    LT -> x : merge xs' ys
    EQ -> x : merge xs' ys'
    GT -> y : merge xs ys'

Observation: I first wrote consume & switch and then equal treatment, both in Python. I knew beforehand that equal treatment would be easy in Haskell, but I avoided it in Python because it seemed like a hassle there. Only after inventing consume & switch in Python I wrote its Haskell versions, and I might well have overlooked this strategy had I not used Python earlier. It seems like both languages had a significant influence on how I thought (or wished to think) about the problem.