Skip to content

csttool.extract

Atlas-based ROI filtering that isolates the bilateral CST from a whole-brain tractogram. The CLI counterpart is extract.

from csttool.extract import extract_cst

extract_cst(
    tractogram="tracking/whole_brain.trk",
    fa="tracking/dti_FA.nii.gz",
    out="./extract",
    extraction_method="endpoint",
)

csttool.extract

Functions

extract_bilateral_cst(streamlines, masks, affine, min_length=20.0, max_length=200.0, verbose=True)

Extract bilateral CST from whole-brain tractogram.

Filters streamlines that connect brainstem to motor cortex, separated by hemisphere.

Parameters:

Name Type Description Default
streamlines Streamlines

Whole-brain tractogram.

required
masks dict

ROI masks from create_cst_roi_masks(): - 'brainstem': Brainstem mask - 'motor_left': Left motor cortex mask - 'motor_right': Right motor cortex mask

required
affine (ndarray, shape(4, 4))

Affine matrix for the masks.

required
min_length float

Minimum streamline length in mm. Default is 20.

20.0
max_length float

Maximum streamline length in mm. Default is 200.

200.0
verbose bool

Print progress information.

True

Returns:

Name Type Description
result dict

Dictionary containing: - 'cst_left': Left CST streamlines - 'cst_right': Right CST streamlines - 'cst_combined': All CST streamlines - 'left_indices': Indices of left CST in original - 'right_indices': Indices of right CST in original - 'stats': Extraction statistics

Source code in src/csttool/extract/modules/endpoint_filtering.py
def extract_bilateral_cst(
    streamlines,
    masks,
    affine,
    min_length=20.0,
    max_length=200.0,
    verbose=True
):
    """
    Extract bilateral CST from whole-brain tractogram.

    Filters streamlines that connect brainstem to motor cortex,
    separated by hemisphere.

    Parameters
    ----------
    streamlines : Streamlines
        Whole-brain tractogram.
    masks : dict
        ROI masks from create_cst_roi_masks():
        - 'brainstem': Brainstem mask
        - 'motor_left': Left motor cortex mask
        - 'motor_right': Right motor cortex mask
    affine : ndarray, shape (4, 4)
        Affine matrix for the masks.
    min_length : float, optional
        Minimum streamline length in mm. Default is 20.
    max_length : float, optional
        Maximum streamline length in mm. Default is 200.
    verbose : bool, optional
        Print progress information.

    Returns
    -------
    result : dict
        Dictionary containing:
        - 'cst_left': Left CST streamlines
        - 'cst_right': Right CST streamlines
        - 'cst_combined': All CST streamlines
        - 'left_indices': Indices of left CST in original
        - 'right_indices': Indices of right CST in original
        - 'stats': Extraction statistics
    """
    from dipy.tracking.streamline import length

    if verbose:
        print("=" * 60)
        print("EXTRACT BILATERAL CST")
        print("=" * 60)
        print(f"\nInput: {len(streamlines):,} streamlines")

    # -------------------------------------------------------------------------
    # Step 1: Length filtering
    # -------------------------------------------------------------------------
    if verbose:
        print(f"\n[Step 1/3] Length filtering ({min_length}-{max_length} mm)...")

    lengths = np.array([length(sl) for sl in streamlines])
    length_mask = (lengths >= min_length) & (lengths <= max_length)
    length_valid_indices = np.where(length_mask)[0]

    streamlines_length_filtered = Streamlines([streamlines[i] for i in length_valid_indices])

    if verbose:
        print(f"  → {len(streamlines):,}{len(streamlines_length_filtered):,} streamlines")
    if verbose:
        print(f"    • Removed: {len(streamlines) - len(streamlines_length_filtered):,} (too short/long)")

    # -------------------------------------------------------------------------
    # Step 2: Extract Left CST (brainstem ↔ motor_left)
    # -------------------------------------------------------------------------
    if verbose:
        print("\n[Step 2/3] Extracting Left CST (brainstem ↔ motor_left)...")

    cst_left, left_indices_filtered = filter_streamlines_by_endpoints(
        streamlines_length_filtered,
        masks['brainstem'],
        masks['motor_left'],
        affine,
        verbose=verbose
    )

    # Map back to original indices
    left_indices_original = length_valid_indices[left_indices_filtered]

    # -------------------------------------------------------------------------
    # Step 3: Extract Right CST (brainstem ↔ motor_right)
    # -------------------------------------------------------------------------
    if verbose:
        print("\n[Step 3/3] Extracting Right CST (brainstem ↔ motor_right)...")

    cst_right, right_indices_filtered = filter_streamlines_by_endpoints(
        streamlines_length_filtered,
        masks['brainstem'],
        masks['motor_right'],
        affine,
        verbose=verbose
    )

    # Map back to original indices
    right_indices_original = length_valid_indices[right_indices_filtered]

    # -------------------------------------------------------------------------
    # Combine results
    # -------------------------------------------------------------------------
    cst_combined = Streamlines(list(cst_left) + list(cst_right))

    # Statistics
    stats = {
        'total_input': len(streamlines),
        'after_length_filter': len(streamlines_length_filtered),
        'cst_left_count': len(cst_left),
        'cst_right_count': len(cst_right),
        'cst_total_count': len(cst_combined),
        'extraction_rate': len(cst_combined) / len(streamlines) * 100 if len(streamlines) > 0 else 0,
        'left_right_ratio': len(cst_left) / len(cst_right) if len(cst_right) > 0 else float('inf'),
        'min_length': min_length,
        'max_length': max_length
    }

    if verbose:
        print("\n" + "=" * 60)
        print("  ✓ CST extraction complete")
        print("=" * 60)
        print("\nResults:")
        print(f"  Left CST:  {stats['cst_left_count']:,} streamlines")
        print(f"  Right CST: {stats['cst_right_count']:,} streamlines")
        print(f"  Total:     {stats['cst_total_count']:,} streamlines")
    if verbose:
        print("\nDiagnostics:")
        print(f"    • Extraction rate: {stats['extraction_rate']:.2f}%")
        print(f"    • L/R ratio: {stats['left_right_ratio']:.2f}")

    return {
        'cst_left': cst_left,
        'cst_right': cst_right,
        'cst_combined': cst_combined,
        'left_indices': left_indices_original,
        'right_indices': right_indices_original,
        'stats': stats
    }

extract_cst_passthrough(streamlines, masks, affine, min_length=20.0, max_length=200.0, verbose=True)

Extract bilateral CST using pass-through filtering.

A streamline is included if it passes through BOTH the brainstem AND the corresponding motor cortex at any point along its length.

Parameters:

Name Type Description Default
streamlines Streamlines

Whole-brain tractogram.

required
masks dict

ROI masks with keys: 'brainstem', 'motor_left', 'motor_right'

required
affine (ndarray, shape(4, 4))

Affine matrix for masks.

required
min_length float

Minimum streamline length in mm.

20.0
max_length float

Maximum streamline length in mm.

200.0
verbose bool

Print progress information.

True

Returns:

Name Type Description
result dict

Dictionary with 'cst_left', 'cst_right', 'cst_combined', 'stats'

Source code in src/csttool/extract/modules/passthrough_filtering.py
def extract_cst_passthrough(
    streamlines,
    masks,
    affine,
    min_length=20.0,
    max_length=200.0,
    verbose=True
):
    """
    Extract bilateral CST using pass-through filtering.

    A streamline is included if it passes through BOTH the brainstem
    AND the corresponding motor cortex at any point along its length.

    Parameters
    ----------
    streamlines : Streamlines
        Whole-brain tractogram.
    masks : dict
        ROI masks with keys: 'brainstem', 'motor_left', 'motor_right'
    affine : ndarray, shape (4, 4)
        Affine matrix for masks.
    min_length : float
        Minimum streamline length in mm.
    max_length : float
        Maximum streamline length in mm.
    verbose : bool
        Print progress information.

    Returns
    -------
    result : dict
        Dictionary with 'cst_left', 'cst_right', 'cst_combined', 'stats'
    """
    if verbose:
        print("=" * 60)
        print("PASS-THROUGH CST EXTRACTION")
        print("=" * 60)
        print(f"\nInput: {len(streamlines):,} streamlines")

    brainstem = masks['brainstem']
    motor_left = masks['motor_left']
    motor_right = masks['motor_right']

    # Length filtering
    if verbose:
        print(f"\n[Step 1/2] Length filtering ({min_length}-{max_length} mm)...")

    lengths = np.array([length(sl) for sl in streamlines])
    length_mask = (lengths >= min_length) & (lengths <= max_length)
    valid_indices = np.where(length_mask)[0]
    streamlines_filtered = Streamlines([streamlines[i] for i in valid_indices])

    if verbose:
        print(f"    {len(streamlines):,}{len(streamlines_filtered):,} streamlines")

    # Analyze input tractogram hemisphere distribution (for asymmetry diagnosis)
    if verbose:
        print(f"\n    Analyzing input hemisphere distribution...")
    left_input = sum(1 for sl in streamlines_filtered if np.mean(sl[:, 0]) < 0)
    right_input = len(streamlines_filtered) - left_input
    lr_input = left_input / right_input if right_input > 0 else 0
    if verbose:
        print(f"    Input hemisphere distribution:")
        print(f"        Left (centroid X<0):  {left_input:,}")
        print(f"        Right (centroid X≥0): {right_input:,}")
        print(f"        L/R Ratio: {lr_input:.3f}")

    # Pass-through filtering
    if verbose:
        print(f"\n[Step 2/2] Pass-through filtering...")

    cst_left_list = []
    cst_right_list = []
    bilateral_excluded_count = 0
    midline_excluded_count = 0

    # Diagnostic counters for per-stage asymmetry analysis
    passes_brainstem_count = 0
    left_bs_count = 0  # Brainstem passes from left hemisphere streamlines
    right_bs_count = 0  # Brainstem passes from right hemisphere streamlines
    passes_left_motor_count = 0
    passes_right_motor_count = 0

    # Brainstem entry point diagnostics (to check classification alignment)
    left_entry_xs = []  # X coords of brainstem entry for left-classified streamlines
    right_entry_xs = []  # X coords of brainstem entry for right-classified streamlines

    # Inferior extent diagnostics (for streamlines failing brainstem check)
    left_fail_min_zs = []  # Min Z for left-classified streamlines that fail brainstem
    right_fail_min_zs = []  # Min Z for right-classified streamlines that fail brainstem

    for i, sl in enumerate(streamlines_filtered):
        passes_bs = streamline_passes_through(sl, brainstem, affine)

        if passes_bs:
            passes_brainstem_count += 1  # Count brainstem hits

            # Track hemisphere of brainstem-passing streamlines (using centroid X)
            sl_centroid_x = np.mean(sl[:, 0])
            if sl_centroid_x < 0:
                left_bs_count += 1
            else:
                right_bs_count += 1

            # Record brainstem entry X for diagnostic (classification alignment check)
            entry_x = get_first_brainstem_entry(sl, brainstem, affine)
            if entry_x is not None:
                if sl_centroid_x < 0:
                    left_entry_xs.append(entry_x)
                else:
                    right_entry_xs.append(entry_x)

            # Check mutual exclusivity (bilateral motor)
            passes_left = streamline_passes_through(sl, motor_left, affine)
            passes_right = streamline_passes_through(sl, motor_right, affine)

            # Count motor cortex hits (before exclusion logic)
            if passes_left:
                passes_left_motor_count += 1
            if passes_right:
                passes_right_motor_count += 1

            if passes_left and passes_right:
                bilateral_excluded_count += 1
                continue

            # Check midline crossing with tolerance for registration imperfection
            # This catches streamlines that grossly cross hemispheres (commissural)
            x_coords = sl[:, 0]
            x_min, x_max = np.min(x_coords), np.max(x_coords)
            MIDLINE_TOLERANCE_MM = 8.0  # Allow minor medial excursion

            # Only exclude if streamline has substantial extent on BOTH sides
            # i.e., it starts/ends deep in left AND goes deep into right
            if x_min < -MIDLINE_TOLERANCE_MM and x_max > MIDLINE_TOLERANCE_MM:
                midline_excluded_count += 1
                continue

            if passes_left:
                cst_left_list.append(sl)
            elif passes_right:
                cst_right_list.append(sl)
        else:
            # Streamline failed brainstem check - record inferior extent for diagnostic
            min_z = np.min(sl[:, 2])
            sl_centroid_x = np.mean(sl[:, 0])
            if sl_centroid_x < 0:
                left_fail_min_zs.append(min_z)
            else:
                right_fail_min_zs.append(min_z)

        if verbose and (i + 1) % 50000 == 0:
            print(f"    Processed {i + 1:,} / {len(streamlines_filtered):,}...")

    cst_left = Streamlines(cst_left_list)
    cst_right = Streamlines(cst_right_list)
    cst_combined = Streamlines(cst_left_list + cst_right_list)

    # Compute L/R ratios for stats
    lr_bs = left_bs_count / right_bs_count if right_bs_count > 0 else 0

    stats = {
        'total_input': len(streamlines),
        'after_length_filter': len(streamlines_filtered),
        # Input hemisphere distribution
        'input_left': left_input,
        'input_right': right_input,
        'input_lr_ratio': lr_input,
        # Brainstem hemisphere split
        'passes_brainstem': passes_brainstem_count,
        'left_bs': left_bs_count,
        'right_bs': right_bs_count,
        'lr_bs': lr_bs,
        # Motor cortex counts (before exclusion)
        'passes_left_motor': passes_left_motor_count,
        'passes_right_motor': passes_right_motor_count,
        # Conditional motor yields
        'left_motor_yield': (passes_left_motor_count / left_bs_count * 100) if left_bs_count > 0 else 0,
        'right_motor_yield': (passes_right_motor_count / right_bs_count * 100) if right_bs_count > 0 else 0,
        # Final counts
        'cst_left_count': len(cst_left),
        'cst_right_count': len(cst_right),
        'cst_total_count': len(cst_combined),
        'bilateral_excluded': bilateral_excluded_count,
        'midline_excluded': midline_excluded_count,
        'extraction_rate': len(cst_combined) / len(streamlines) * 100 if len(streamlines) > 0 else 0,
    }

    if verbose:
        print(f"\n" + "=" * 60)
        print("PASS-THROUGH EXTRACTION COMPLETE")
        print("=" * 60)

        # Per-stage asymmetry analysis for diagnosis
        lr_motor = passes_left_motor_count / passes_right_motor_count if passes_right_motor_count > 0 else 0
        lr_final = len(cst_left) / len(cst_right) if len(cst_right) > 0 else 0

        print(f"\nPer-Stage Asymmetry Analysis:")
        print(f"    Stage                           Left      Right     L/R Ratio")
        print(f"    " + "-" * 60)
        print(f"    Input (post-length filter)  {left_input:>8,}  {right_input:>8,}     {lr_input:.3f}")
        print(f"    Pass through brainstem      {left_bs_count:>8,}  {right_bs_count:>8,}     {lr_bs:.3f}")
        print(f"    Pass through motor cortex   {passes_left_motor_count:>8,}  {passes_right_motor_count:>8,}     {lr_motor:.3f}")
        print(f"    Final (after exclusions)    {len(cst_left):>8,}  {len(cst_right):>8,}     {lr_final:.3f}")

        # Conditional motor yields (key diagnostic)
        left_yield = stats['left_motor_yield']
        right_yield = stats['right_motor_yield']
        print(f"\n    Conditional motor yields (P(motor | brainstem+hemisphere)):")
        print(f"      Left:  {left_yield:5.1f}% ({passes_left_motor_count:,} / {left_bs_count:,})")
        print(f"      Right: {right_yield:5.1f}% ({passes_right_motor_count:,} / {right_bs_count:,})")
        if right_yield > 0:
            yield_ratio = left_yield / right_yield
            print(f"      Yield ratio (L/R): {yield_ratio:.3f}")

        # Brainstem ROI geometry
        print(f"\n    Brainstem ROI Geometry:")
        bs_geom = get_roi_geometry(brainstem, affine, "brainstem")
        print_roi_geometry(bs_geom)

        # Brainstem hemisphere split analysis
        bs_split = get_roi_hemisphere_split(brainstem, affine, "brainstem")
        print_roi_hemisphere_split(bs_split)

        # Brainstem entry point distribution (classification alignment diagnostic)
        if left_entry_xs and right_entry_xs:
            left_entry_xs_arr = np.array(left_entry_xs)
            right_entry_xs_arr = np.array(right_entry_xs)
            print(f"\n    Brainstem entry point distribution:")
            print(f"      Left-classified streamlines (n={len(left_entry_xs_arr):,}):")
            print(f"        Entry X mean: {np.mean(left_entry_xs_arr):.1f} mm")
            left_correct = np.sum(left_entry_xs_arr < 0)
            print(f"        Enter left side (X<0): {left_correct:,} ({left_correct / len(left_entry_xs_arr) * 100:.1f}%)")
            print(f"      Right-classified streamlines (n={len(right_entry_xs_arr):,}):")
            print(f"        Entry X mean: {np.mean(right_entry_xs_arr):.1f} mm")
            right_correct = np.sum(right_entry_xs_arr >= 0)
            print(f"        Enter right side (X>=0): {right_correct:,} ({right_correct / len(right_entry_xs_arr) * 100:.1f}%)")
            # Summary interpretation
            left_pct = left_correct / len(left_entry_xs_arr) * 100
            right_pct = right_correct / len(right_entry_xs_arr) * 100
            if left_pct > 80 and right_pct > 80:
                print(f"      -> Classification aligns with brainstem entry (likely FA/tracking asymmetry)")
            elif left_pct < 60 or right_pct < 60:
                print(f"      -> Classification mismatch detected (centroid X misleading)")
            else:
                print(f"      -> Moderate alignment (mixed classification)")

        # Inferior extent analysis (streamlines failing brainstem check)
        if left_fail_min_zs and right_fail_min_zs:
            left_fail_arr = np.array(left_fail_min_zs)
            right_fail_arr = np.array(right_fail_min_zs)
            print(f"\n    Inferior extent (streamlines failing brainstem check):")
            print(f"      Left-classified (n={len(left_fail_arr):,}):")
            print(f"        Mean min Z: {np.mean(left_fail_arr):.1f} mm")
            print(f"      Right-classified (n={len(right_fail_arr):,}):")
            print(f"        Mean min Z: {np.mean(right_fail_arr):.1f} mm")
            z_diff = np.mean(left_fail_arr) - np.mean(right_fail_arr)
            if z_diff > 2.0:
                print(f"      -> Left streamlines stop {z_diff:.1f} mm higher (earlier termination)")
            elif z_diff < -2.0:
                print(f"      -> Right streamlines stop {-z_diff:.1f} mm higher (earlier termination)")
            else:
                print(f"      -> Similar inferior extent (Δ = {z_diff:.1f} mm)")

        # ROI geometry (for diagnosing spatial positioning)
        print(f"\n    Motor ROI Geometry:")
        left_geom = get_roi_geometry(motor_left, affine, "motor_left")
        right_geom = get_roi_geometry(motor_right, affine, "motor_right")
        print_roi_geometry(left_geom)
        print_roi_geometry(right_geom)
        if left_geom and right_geom:
            midline_diff = right_geom['midline_dist'] - left_geom['midline_dist']
            if abs(midline_diff) > 2.0:
                closer = "right" if midline_diff < 0 else "left"
                print(f"    → {closer} motor ROI is {abs(midline_diff):.1f} mm closer to midline")

        # Highlight where asymmetry is introduced
        print(f"\n    Asymmetry changes:")
        if abs(lr_input - 1.0) > 0.05:
            print(f"      Input tractogram already asymmetric (L/R = {lr_input:.3f})")
        if abs(lr_bs - lr_input) > 0.05:
            print(f"      Brainstem check: {lr_input:.3f}{lr_bs:.3f} (Δ = {lr_bs - lr_input:+.3f})")
        if abs(lr_motor - lr_bs) > 0.05:
            print(f"      Motor cortex check: {lr_bs:.3f}{lr_motor:.3f} (Δ = {lr_motor - lr_bs:+.3f})")
        if abs(lr_final - lr_motor) > 0.05:
            print(f"      Exclusion filters: {lr_motor:.3f}{lr_final:.3f} (Δ = {lr_final - lr_motor:+.3f})")
        if (abs(lr_input - 1.0) <= 0.05 and abs(lr_bs - lr_input) <= 0.05 and
            abs(lr_motor - lr_bs) <= 0.05 and abs(lr_final - lr_motor) <= 0.05):
            print(f"      No significant asymmetry changes detected")

        print(f"\nResults:")
        print(f"    Left CST:  {stats['cst_left_count']:,} streamlines")
        print(f"    Right CST: {stats['cst_right_count']:,} streamlines")
        print(f"    Total:     {stats['cst_total_count']:,} streamlines")
        print(f"    Rejected (Bilateral): {bilateral_excluded_count:,} streamlines")
        print(f"    Rejected (Midline):   {midline_excluded_count:,} streamlines")

    return {
        'cst_left': cst_left,
        'cst_right': cst_right,
        'cst_combined': cst_combined,
        'stats': stats
    }

extract_cst_roi_seeded(data, gtab, affine, brain_mask, motor_left_mask, motor_right_mask, brainstem_mask, fa_map=None, fa_threshold=0.15, seed_density=2, step_size=0.5, min_length=30.0, max_length=200.0, sh_order=6, relative_peak_threshold=0.5, min_separation_angle=25, verbose=True)

Extract bilateral CST using ROI-seeded tractography.

This method: 1. Seeds streamlines from motor cortex (left and right separately) 2. Tracks bidirectionally using CSA-ODF direction field 3. Filters streamlines that reach the brainstem

Parameters:

Name Type Description Default
data (ndarray, shape(X, Y, Z, N))

DWI data.

required
gtab GradientTable

Gradient table.

required
affine (ndarray, shape(4, 4))

Affine matrix.

required
brain_mask ndarray

3D binary brain mask.

required
motor_left_mask ndarray

3D binary mask for left motor cortex (seed ROI).

required
motor_right_mask ndarray

3D binary mask for right motor cortex (seed ROI).

required
brainstem_mask ndarray

3D binary mask for brainstem (target ROI).

required
fa_map ndarray

Pre-computed FA map. If None, will be computed.

None
fa_threshold float

FA threshold for stopping criterion. Default is 0.15.

0.15
seed_density int

Seeds per voxel dimension. Default is 2.

2
step_size float

Tracking step size in mm. Default is 0.5.

0.5
min_length float

Minimum streamline length in mm. Default is 30.

30.0
max_length float

Maximum streamline length in mm. Default is 200.

200.0
sh_order int

Spherical harmonic order for CSA-ODF. Default is 6.

6
relative_peak_threshold float

Threshold for peak extraction (relative to max). Default is 0.5. Lower values allow more peaks, enabling better crossing fiber handling.

0.5
min_separation_angle int

Minimum angle between peaks in degrees. Default is 25. Lower values allow sharper turns during tracking.

25
verbose bool

Print progress information.

True

Returns:

Name Type Description
result dict

Dictionary containing: - 'cst_left': Left CST streamlines - 'cst_right': Right CST streamlines - 'cst_combined': All CST streamlines - 'stats': Extraction statistics - 'parameters': Tracking parameters used

Source code in src/csttool/extract/modules/roi_seeded_tracking.py
def extract_cst_roi_seeded(
    data,
    gtab,
    affine,
    brain_mask,
    motor_left_mask,
    motor_right_mask,
    brainstem_mask,
    fa_map=None,
    fa_threshold=0.15,
    seed_density=2,
    step_size=0.5,
    min_length=30.0,
    max_length=200.0,
    sh_order=6,
    relative_peak_threshold=0.5,
    min_separation_angle=25,
    verbose=True
):
    """
    Extract bilateral CST using ROI-seeded tractography.

    This method:
    1. Seeds streamlines from motor cortex (left and right separately)
    2. Tracks bidirectionally using CSA-ODF direction field
    3. Filters streamlines that reach the brainstem

    Parameters
    ----------
    data : ndarray, shape (X, Y, Z, N)
        DWI data.
    gtab : GradientTable
        Gradient table.
    affine : ndarray, shape (4, 4)
        Affine matrix.
    brain_mask : ndarray
        3D binary brain mask.
    motor_left_mask : ndarray
        3D binary mask for left motor cortex (seed ROI).
    motor_right_mask : ndarray
        3D binary mask for right motor cortex (seed ROI).
    brainstem_mask : ndarray
        3D binary mask for brainstem (target ROI).
    fa_map : ndarray, optional
        Pre-computed FA map. If None, will be computed.
    fa_threshold : float, optional
        FA threshold for stopping criterion. Default is 0.15.
    seed_density : int, optional
        Seeds per voxel dimension. Default is 2.
    step_size : float, optional
        Tracking step size in mm. Default is 0.5.
    min_length : float, optional
        Minimum streamline length in mm. Default is 30.
    max_length : float, optional
        Maximum streamline length in mm. Default is 200.
    sh_order : int, optional
        Spherical harmonic order for CSA-ODF. Default is 6.
    relative_peak_threshold : float, optional
        Threshold for peak extraction (relative to max). Default is 0.5.
        Lower values allow more peaks, enabling better crossing fiber handling.
    min_separation_angle : int, optional
        Minimum angle between peaks in degrees. Default is 25.
        Lower values allow sharper turns during tracking.
    verbose : bool, optional
        Print progress information.

    Returns
    -------
    result : dict
        Dictionary containing:
        - 'cst_left': Left CST streamlines
        - 'cst_right': Right CST streamlines
        - 'cst_combined': All CST streamlines
        - 'stats': Extraction statistics
        - 'parameters': Tracking parameters used
    """
    from dipy.reconst.dti import TensorModel

    if verbose:
        print("=" * 60)
        print("ROI-SEEDED CST EXTRACTION")
        print("=" * 60)

    # -------------------------------------------------------------------------
    # Step 1: Compute FA if not provided
    # -------------------------------------------------------------------------
    if fa_map is None:
        if verbose:
            print("\n[Step 1/5] Computing FA map...")
        tensor_model = TensorModel(gtab)
        tensor_fit = tensor_model.fit(data, mask=brain_mask)
        fa_map = tensor_fit.fa
        fa_map = np.clip(fa_map, 0, 1)
    else:
        if verbose:
            print("\n[Step 1/5] Using provided FA map...")

    # -------------------------------------------------------------------------
    # Step 2: Estimate fiber directions using CSA-ODF
    # -------------------------------------------------------------------------
    if verbose:
        print("\n[Step 2/5] Estimating fiber directions (CSA-ODF)...")

    # Validate SH order based on available gradient directions
    from csttool.tracking.modules.estimate_directions import validate_sh_order
    validated_sh_order = validate_sh_order(gtab, sh_order, verbose=verbose)

    csa_model = CsaOdfModel(gtab, sh_order=validated_sh_order)

    # Create white matter mask for direction estimation
    wm_mask = fa_map > fa_threshold

    # Peak extraction - angular constraints are set HERE, not in LocalTracking
    peaks = peaks_from_model(
        csa_model,
        data,
        default_sphere,
        relative_peak_threshold=relative_peak_threshold,
        min_separation_angle=min_separation_angle,
        mask=wm_mask
    )

    if verbose:
        print(f"    Peaks computed for {np.sum(wm_mask):,} voxels")

    # -------------------------------------------------------------------------
    # Step 3: Setup stopping criterion
    # -------------------------------------------------------------------------
    if verbose:
        print(f"\n[Step 3/5] Setting up stopping criterion (FA > {fa_threshold})...")

    stopping_criterion = ThresholdStoppingCriterion(fa_map, fa_threshold)

    # -------------------------------------------------------------------------
    # Step 4: Extract Left CST
    # -------------------------------------------------------------------------
    if verbose:
        print("\n[Step 4/5] Extracting Left CST...")
        print(f"    Generating seeds from left motor cortex (density={seed_density})...")

    left_seeds = generate_seeds_from_mask(motor_left_mask, affine, density=seed_density)

    if verbose:
        print(f"    Seeds: {len(left_seeds):,}")
        print(f"    Tracking...")

    left_streamlines_raw = track_from_seeds(
        left_seeds,
        peaks,
        stopping_criterion,
        affine,
        step_size=step_size
    )

    if verbose:
        print(f"    Generated: {len(left_streamlines_raw):,} streamlines")
        print(f"    Filtering by length...")

    left_streamlines_length = filter_by_length(
        left_streamlines_raw, 
        min_length=min_length, 
        max_length=max_length,
        verbose=verbose
    )

    if verbose:
        print(f"    Filtering by brainstem traversal...")

    cst_left, _ = filter_by_target_roi(
        left_streamlines_length,
        brainstem_mask,
        affine,
        verbose=verbose
    )

    if verbose:
        print(f"    ✓ Left CST: {len(cst_left):,} streamlines")

    # -------------------------------------------------------------------------
    # Step 5: Extract Right CST
    # -------------------------------------------------------------------------
    if verbose:
        print("\n[Step 5/5] Extracting Right CST...")
        print(f"    Generating seeds from right motor cortex (density={seed_density})...")

    right_seeds = generate_seeds_from_mask(motor_right_mask, affine, density=seed_density)

    if verbose:
        print(f"    Seeds: {len(right_seeds):,}")
        print(f"    Tracking...")

    right_streamlines_raw = track_from_seeds(
        right_seeds,
        peaks,
        stopping_criterion,
        affine,
        step_size=step_size
    )

    if verbose:
        print(f"    Generated: {len(right_streamlines_raw):,} streamlines")
        print(f"    Filtering by length...")

    right_streamlines_length = filter_by_length(
        right_streamlines_raw,
        min_length=min_length,
        max_length=max_length,
        verbose=verbose
    )

    if verbose:
        print(f"    Filtering by brainstem traversal...")

    cst_right, _ = filter_by_target_roi(
        right_streamlines_length,
        brainstem_mask,
        affine,
        verbose=verbose
    )

    if verbose:
        print(f"    ✓ Right CST: {len(cst_right):,} streamlines")

    # -------------------------------------------------------------------------
    # Combine results
    # -------------------------------------------------------------------------
    cst_combined = Streamlines(list(cst_left) + list(cst_right))

    stats = {
        'left_seeds': len(left_seeds),
        'right_seeds': len(right_seeds),
        'left_raw': len(left_streamlines_raw),
        'right_raw': len(right_streamlines_raw),
        'left_after_length': len(left_streamlines_length),
        'right_after_length': len(right_streamlines_length),
        'cst_left_count': len(cst_left),
        'cst_right_count': len(cst_right),
        'cst_total_count': len(cst_combined),
        'left_yield': len(cst_left) / len(left_seeds) * 100 if len(left_seeds) > 0 else 0,
        'right_yield': len(cst_right) / len(right_seeds) * 100 if len(right_seeds) > 0 else 0,
        'extraction_rate': len(cst_combined) / (len(left_seeds) + len(right_seeds)) * 100 if (len(left_seeds) + len(right_seeds)) > 0 else 0,
        'method': 'roi-seeded'
    }

    # Add length statistics if we have streamlines
    if len(cst_combined) > 0:
        cst_lengths = np.array([streamline_length(sl) for sl in cst_combined])
        stats['length_mean'] = float(np.mean(cst_lengths))
        stats['length_std'] = float(np.std(cst_lengths))
        stats['length_min'] = float(np.min(cst_lengths))
        stats['length_max'] = float(np.max(cst_lengths))

    parameters = {
        'fa_threshold': fa_threshold,
        'seed_density': seed_density,
        'step_size': step_size,
        'min_length': min_length,
        'max_length': max_length,
        'sh_order': validated_sh_order,
        'relative_peak_threshold': relative_peak_threshold,
        'min_separation_angle': min_separation_angle,
    }

    if verbose:
        print("\n" + "=" * 60)
        print("ROI-SEEDED EXTRACTION COMPLETE")
        print("=" * 60)
        print(f"\nResults:")
        print(f"    Left CST:  {stats['cst_left_count']:,} streamlines ({stats['left_yield']:.2f}% yield)")
        print(f"    Right CST: {stats['cst_right_count']:,} streamlines ({stats['right_yield']:.2f}% yield)")
        print(f"    Total:     {stats['cst_total_count']:,} streamlines")

        if 'length_mean' in stats:
            print(f"    Length: mean={stats['length_mean']:.1f}, "
                  f"range=[{stats['length_min']:.1f}, {stats['length_max']:.1f}] mm")

    return {
        'cst_left': cst_left,
        'cst_right': cst_right,
        'cst_combined': cst_combined,
        'stats': stats,
        'parameters': parameters
    }

extract_cst_bidirectional(data, gtab, affine, brain_mask, motor_left_mask, motor_right_mask, brainstem_mask, fa_map=None, fa_threshold=0.15, seed_density=2, step_size=0.5, min_length=30.0, max_length=200.0, sh_order=6, relative_peak_threshold=0.5, min_separation_angle=25, verbose=True)

Extract bilateral CST using bidirectional seeding with intersection.

Parameters:

Name Type Description Default
data (ndarray, shape(X, Y, Z, N))

DWI data.

required
gtab GradientTable

Gradient table.

required
affine (ndarray, shape(4, 4))

Affine matrix.

required
brain_mask ndarray

3D binary brain mask.

required
motor_left_mask ndarray

3D binary mask for left motor cortex.

required
motor_right_mask ndarray

3D binary mask for right motor cortex.

required
brainstem_mask ndarray

3D binary mask for brainstem.

required
fa_map ndarray

Pre-computed FA map. Computed from DTI if not provided.

None
fa_threshold float

FA tracking threshold. Default 0.15.

0.15
seed_density int

Seeds per voxel dimension. Default 2.

2
step_size float

Tracking step size in mm. Default 0.5.

0.5
min_length float

Minimum streamline length in mm. Default 30.

30.0
max_length float

Maximum streamline length in mm. Default 200.

200.0
sh_order int

Spherical harmonic order for CSA-ODF. Default 6.

6
relative_peak_threshold float

Relative peak threshold. Default 0.5.

0.5
min_separation_angle int

Minimum angle between ODF peaks in degrees. Default 25.

25
verbose bool

Print progress. Default True.

True

Returns:

Name Type Description
result dict

Keys: cst_left, cst_right, cst_combined, stats, parameters.

Source code in src/csttool/extract/modules/bidirectional_filtering.py
def extract_cst_bidirectional(
    data,
    gtab,
    affine,
    brain_mask,
    motor_left_mask,
    motor_right_mask,
    brainstem_mask,
    fa_map=None,
    fa_threshold=0.15,
    seed_density=2,
    step_size=0.5,
    min_length=30.0,
    max_length=200.0,
    sh_order=6,
    relative_peak_threshold=0.5,
    min_separation_angle=25,
    verbose=True,
):
    """
    Extract bilateral CST using bidirectional seeding with intersection.

    Parameters
    ----------
    data : ndarray, shape (X, Y, Z, N)
        DWI data.
    gtab : GradientTable
        Gradient table.
    affine : ndarray, shape (4, 4)
        Affine matrix.
    brain_mask : ndarray
        3D binary brain mask.
    motor_left_mask : ndarray
        3D binary mask for left motor cortex.
    motor_right_mask : ndarray
        3D binary mask for right motor cortex.
    brainstem_mask : ndarray
        3D binary mask for brainstem.
    fa_map : ndarray, optional
        Pre-computed FA map. Computed from DTI if not provided.
    fa_threshold : float, optional
        FA tracking threshold. Default 0.15.
    seed_density : int, optional
        Seeds per voxel dimension. Default 2.
    step_size : float, optional
        Tracking step size in mm. Default 0.5.
    min_length : float, optional
        Minimum streamline length in mm. Default 30.
    max_length : float, optional
        Maximum streamline length in mm. Default 200.
    sh_order : int, optional
        Spherical harmonic order for CSA-ODF. Default 6.
    relative_peak_threshold : float, optional
        Relative peak threshold. Default 0.5.
    min_separation_angle : int, optional
        Minimum angle between ODF peaks in degrees. Default 25.
    verbose : bool, optional
        Print progress. Default True.

    Returns
    -------
    result : dict
        Keys: cst_left, cst_right, cst_combined, stats, parameters.
    """
    from dipy.reconst.dti import TensorModel

    if verbose:
        print("=" * 60)
        print("BIDIRECTIONAL CST EXTRACTION")
        print("=" * 60)

    # ------------------------------------------------------------------
    # Step 1: FA + ODF peaks + stopping criterion
    # ------------------------------------------------------------------
    if fa_map is None:
        if verbose:
            print("\n[Step 1/6] Computing FA map...")
        tensor_model = TensorModel(gtab)
        tensor_fit = tensor_model.fit(data, mask=brain_mask)
        fa_map = np.clip(tensor_fit.fa, 0, 1)
    else:
        if verbose:
            print("\n[Step 1/6] Using provided FA map...")

    if verbose:
        print("\n[Step 2/6] Estimating fiber directions (CSA-ODF)...")

    from csttool.tracking.modules.estimate_directions import validate_sh_order
    validated_sh_order = validate_sh_order(gtab, sh_order, verbose=verbose)

    csa_model = CsaOdfModel(gtab, sh_order=validated_sh_order)
    wm_mask = fa_map > fa_threshold
    peaks = peaks_from_model(
        csa_model, data, default_sphere,
        relative_peak_threshold=relative_peak_threshold,
        min_separation_angle=min_separation_angle,
        mask=wm_mask,
    )

    if verbose:
        print(f"    Peaks computed for {wm_mask.sum():,} voxels")
        print(f"\n[Step 3/6] Setting up stopping criterion (FA > {fa_threshold})...")

    stopping_criterion = ThresholdStoppingCriterion(fa_map, fa_threshold)
    shape = brain_mask.shape

    # ------------------------------------------------------------------
    # Step 2: Forward left (motor left → brainstem)
    # ------------------------------------------------------------------
    if verbose:
        print("\n[Step 4/6] Forward pass: motor cortex → brainstem...")

    left_seeds = generate_seeds_from_mask(motor_left_mask, affine, density=seed_density)
    right_seeds = generate_seeds_from_mask(motor_right_mask, affine, density=seed_density)

    if verbose:
        print(f"    Left motor seeds:  {len(left_seeds):,}")
        print(f"    Right motor seeds: {len(right_seeds):,}")

    left_fwd_all = filter_by_length(
        track_from_seeds(left_seeds, peaks, stopping_criterion, affine, step_size),
        min_length, max_length, verbose=verbose,
    )
    left_fwd, _ = filter_by_target_roi(left_fwd_all, brainstem_mask, affine, verbose=verbose)

    right_fwd_all = filter_by_length(
        track_from_seeds(right_seeds, peaks, stopping_criterion, affine, step_size),
        min_length, max_length, verbose=verbose,
    )
    right_fwd, _ = filter_by_target_roi(right_fwd_all, brainstem_mask, affine, verbose=verbose)

    if verbose:
        print(f"    Forward left → brainstem:  {len(left_fwd):,}")
        print(f"    Forward right → brainstem: {len(right_fwd):,}")

    # ------------------------------------------------------------------
    # Step 3: Reverse pass (brainstem → motor cortex, L and R)
    # ------------------------------------------------------------------
    if verbose:
        print("\n[Step 5/6] Reverse pass: brainstem → motor cortex...")

    bs_seeds = generate_seeds_from_mask(brainstem_mask, affine, density=seed_density)

    if verbose:
        print(f"    Brainstem seeds: {len(bs_seeds):,}")

    bs_all = filter_by_length(
        track_from_seeds(bs_seeds, peaks, stopping_criterion, affine, step_size),
        min_length, max_length, verbose=verbose,
    )

    bs_to_left, _ = filter_by_target_roi(bs_all, motor_left_mask, affine, verbose=verbose)
    bs_to_right, _ = filter_by_target_roi(bs_all, motor_right_mask, affine, verbose=verbose)

    if verbose:
        print(f"    Brainstem → left motor:  {len(bs_to_left):,}")
        print(f"    Brainstem → right motor: {len(bs_to_right):,}")

    # ------------------------------------------------------------------
    # Step 4: Count-bounded intersection
    #
    # The naive "density > 0" intersection is a no-op: both forward and
    # reverse bundles traverse the same voxels (the CST territory), so
    # every forward streamline passes through the reverse confirmed zone.
    #
    # The source of the asymmetry is in HOW MANY seeds initiate valid CST
    # trajectories from each motor cortex ROI, not in where those
    # trajectories go.  The reverse (brainstem-seeded) count is the
    # symmetric, unbiased reference.
    #
    # Fix: cap the forward count at min(N_fwd, N_reverse) per side,
    # choosing the forward streamlines ranked highest by spatial overlap
    # with the reverse-pass density map.  This selects the "most
    # representative" forward streamlines and discards excess ones that
    # arose from the ROI placement advantage.
    # ------------------------------------------------------------------
    if verbose:
        print("\n[Step 6/6] Count-bounded intersection (forward ranked by reverse density)...")

    dens_L = _voxelise(bs_to_left, affine, shape)
    dens_R = _voxelise(bs_to_right, affine, shape)

    if verbose:
        print(f"    Reverse density nonzero: left={int((dens_L > 0).sum()):,}  right={int((dens_R > 0).sum()):,}")

    def _select_top_n(forward_bundle, density, n_keep):
        if len(forward_bundle) == 0 or n_keep == 0:
            return Streamlines()
        scores = np.array([_overlap_score(s, density, affine) for s in forward_bundle])
        n_keep = min(n_keep, int((scores > 0).sum()))   # can't keep more than have overlap
        if n_keep == 0:
            return Streamlines()
        top_idx = np.argsort(scores)[-n_keep:]
        return Streamlines([forward_bundle[i] for i in sorted(top_idx)])

    # Per-side cap only — bilateral symmetry is NOT enforced. Each hemisphere is
    # bounded by its own reverse-pass count, which removes the cortical-interface
    # inflation per side without assuming the two sides should match. This
    # preserves genuine asymmetry (e.g. unilateral motor pathology) while still
    # correcting the placement artifact.
    n_keep_L = min(len(left_fwd), len(bs_to_left))
    n_keep_R = min(len(right_fwd), len(bs_to_right))

    if verbose:
        print(f"    Per-side cap — left:  min({len(left_fwd)}, {len(bs_to_left)}) = {n_keep_L}")
        print(f"    Per-side cap — right: min({len(right_fwd)}, {len(bs_to_right)}) = {n_keep_R}")

    cst_left  = _select_top_n(left_fwd,  dens_L, n_keep_L)
    cst_right = _select_top_n(right_fwd, dens_R, n_keep_R)

    if verbose:
        print(f"    After intersection — left:  {len(left_fwd):,}{len(cst_left):,}")
        print(f"    After intersection — right: {len(right_fwd):,}{len(cst_right):,}")

    cst_combined = Streamlines(list(cst_left) + list(cst_right))

    # ------------------------------------------------------------------
    # Stats
    # ------------------------------------------------------------------
    # Forward/reverse inflation per side. Comparing the two ratios indicates
    # whether residual L/R count asymmetry is methodological (interface artifact)
    # or structural (pathology / genuine biology).
    #   inflation_L ≈ inflation_R → both sides over-produced equally; residual
    #     asymmetry is structural.
    #   inflation_L ≠ inflation_R → one cortical ROI seeded more efficiently
    #     than the other; residual asymmetry is suspect.
    inflation_L = len(left_fwd) / max(len(bs_to_left), 1)
    inflation_R = len(right_fwd) / max(len(bs_to_right), 1)
    artifact_index = (
        abs(inflation_L - inflation_R) / max(inflation_L, inflation_R, 1.0)
    )

    stats = {
        'left_seeds': len(left_seeds),
        'right_seeds': len(right_seeds),
        'bs_seeds': len(bs_seeds),
        'left_forward_count': len(left_fwd),
        'right_forward_count': len(right_fwd),
        'bs_to_left_count': len(bs_to_left),
        'bs_to_right_count': len(bs_to_right),
        'cst_left_count': len(cst_left),
        'cst_right_count': len(cst_right),
        'cst_total_count': len(cst_combined),
        'total_input': len(left_fwd) + len(right_fwd),
        'left_intersection_rate': len(cst_left) / max(len(left_fwd), 1) * 100,
        'right_intersection_rate': len(cst_right) / max(len(right_fwd), 1) * 100,
        'left_yield': len(cst_left) / max(len(left_seeds), 1) * 100,
        'right_yield': len(cst_right) / max(len(right_seeds), 1) * 100,
        'extraction_rate': len(cst_combined) / max(len(left_seeds) + len(right_seeds), 1) * 100,
        'forward_reverse_ratio_left': inflation_L,
        'forward_reverse_ratio_right': inflation_R,
        'artifact_index': artifact_index,
        'method': 'bidirectional',
    }

    if len(cst_combined) > 0:
        lengths = np.array([streamline_length(s) for s in cst_combined])
        stats['length_mean'] = float(np.mean(lengths))
        stats['length_std'] = float(np.std(lengths))
        stats['length_min'] = float(np.min(lengths))
        stats['length_max'] = float(np.max(lengths))

    parameters = {
        'fa_threshold': fa_threshold,
        'seed_density': seed_density,
        'step_size': step_size,
        'min_length': min_length,
        'max_length': max_length,
        'sh_order': validated_sh_order,
        'relative_peak_threshold': relative_peak_threshold,
        'min_separation_angle': min_separation_angle,
    }

    if verbose:
        print("\n" + "=" * 60)
        print("BIDIRECTIONAL EXTRACTION COMPLETE")
        print("=" * 60)
        lr = len(cst_right) / max(len(cst_left), 1)
        print(f"\n  Left CST:  {stats['cst_left_count']:,} streamlines ({stats['left_yield']:.2f}% yield)")
        print(f"  Right CST: {stats['cst_right_count']:,} streamlines ({stats['right_yield']:.2f}% yield)")
        print(f"  Total:     {stats['cst_total_count']:,} streamlines  R/L = {lr:.3f}")
        print(f"  Forward/reverse inflation — left: {inflation_L:.2f}  right: {inflation_R:.2f}")
        if artifact_index > 0.2:
            print(f"  Artifact index: {artifact_index:.2f} (>0.20) — cortical interface "
                  f"asymmetry suspected; residual L/R count difference may be methodological.")
        else:
            print(f"  Artifact index: {artifact_index:.2f} (≤0.20) — inflation symmetric; "
                  f"residual L/R count difference is likely structural.")
        if 'length_mean' in stats:
            print(f"  Length: mean={stats['length_mean']:.1f}, "
                  f"range=[{stats['length_min']:.1f}, {stats['length_max']:.1f}] mm")

    return {
        'cst_left': cst_left,
        'cst_right': cst_right,
        'cst_combined': cst_combined,
        'stats': stats,
        'parameters': parameters,
    }