import os


rule all:
    input:
        "aggregated.txt"

# The number of pieces (and therefore the process_piece / aggregate jobs) is
# not known until the checkpoint has run, so these rules are absent from the
# initial run_info event and only appear once the DAG is re-evaluated.
checkpoint split:
    output:
        directory("pieces")
    shell:
        """
        mkdir -p {output}
        for i in 0 1 2; do echo "piece $i" > {output}/piece_$i.txt; done
        sleep 0.2
        """

rule process_piece:
    input:
        "pieces/piece_{i}.txt"
    output:
        "processed_{i}.txt"
    message:
        "Processing piece {wildcards.i}"
    shell:
        """
        cat {input} > {output}
        sleep 0.2
        """

def aggregate_input(wildcards):
    checkpoint_output = checkpoints.split.get(**wildcards).output[0]
    pieces = glob_wildcards(os.path.join(checkpoint_output, "piece_{i}.txt")).i
    return expand("processed_{i}.txt", i=pieces)

rule aggregate:
    input:
        aggregate_input
    output:
        "aggregated.txt"
    shell:
        "cat {input} > {output}"
