Automated Step Detection in Force Spectroscopy Data

The script loads raw force-vs-time data from a CSV file, automatically identifies step-like changes in the force signal, and exports a cleaned list of meaningful force transitions to Excel. It first detects changepoints in the force trace using a PELT-based algorithm from the ruptures library, which segments the signal into plateaus and finds the boundaries between them. Each boundary is treated as a candidate step, with its timestamp, the average force before and after, and the delta force (direction up or down). After detection, the script applies two biologically motivated cleanup rules: (1) it merges consecutive steps in the same direction that occur within a short time window, treating them as a single physical event instead of multiple tiny jumps; and (2) it removes fast “spike” pairs where the force briefly jumps up and then down (or down then up) within a very short interval, which are likely noise rather than true mechanical steps. Finally, it filters for steps with an absolute amplitude ≥ 1 pN and writes several worksheets to an Excel file, including the raw detection, intermediate cleanup stages, and the final curated step list. This makes it easy to batch-process kymograph force traces and consistently quantify discrete force changes.

# ------------------------------------------------------------
# Requirements:
# pip install pandas numpy ruptures openpyxl
# ------------------------------------------------------------

import pandas as pd
import numpy as np
import ruptures as rpt
import os

########################
# 1. User I/O settings #
########################

infile = "./ForceHFCorrectedForce2x_p853_p502_b3_1_downsampled1000_HF_HF.csv"
outfile = "force_steps.xlsx"

MIN_STEP = 1.0          # pN threshold to report
FALLBACK_FS = 78.1      # Hz, if no time column
PENALTY = 5             # ruptures sensitivity

# post-processing params:
MERGE_WINDOW = 0.10     # sec: merge same-direction steps < this apart
CANCEL_WINDOW = 0.20    # sec: remove opposite-direction spike pairs < this apart

#####################################
# 2. Load data and pick time/force  #
#####################################

df = pd.read_csv(infile)

# Guess force column
force_col_candidates = [
    "Corrected Force 2x",
    "Corrected Force",
    "Force",
    "Force HF",
    "force",
    "force_pN",
]
force_col = None
for c in force_col_candidates:
    if c in df.columns:
        force_col = c
        break
if force_col is None:
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) == 0:
        raise ValueError("No numeric force column found in CSV.")
    force_col = numeric_cols[-1]

# Guess time column
time_col_candidates = [
    "Time",
    "time",
    "t",
    "timestamp",
    "Timestamp (s)",
    "Seconds",
]
time_col = None
for c in time_col_candidates:
    if c in df.columns:
        time_col = c
        break
if time_col is None:
    n = len(df)
    dt = 1.0 / FALLBACK_FS
    df["__time_s"] = np.arange(n) * dt
    time_col = "__time_s"

time = df[time_col].to_numpy()
force = df[force_col].to_numpy()

###########################################
# 3. Changepoint detection (ruptures PELT)#
###########################################

algo = rpt.Pelt(model="l2").fit(force)
bkpts = algo.predict(pen=PENALTY)
# bkpts are 1-based segment ends

seg_starts = [0] + [b for b in bkpts[:-1]]
seg_ends   = [b for b in bkpts]  # 1-based inclusive ends

segments = []
for start, end in zip(seg_starts, seg_ends):
    seg_force = force[start:end]
    seg_time = time[start:end]
    mean_force = np.mean(seg_force)
    mid_idx = start + (len(seg_force) // 2)
    mid_time = time[mid_idx]
    segments.append(
        {
            "start_idx": start,
            "end_idx": end - 1,
            "mean_force": mean_force,
            "mid_time": mid_time,
        }
    )

# Build raw step list
raw_changes = []
for i in range(len(segments) - 1):
    f_before = segments[i]["mean_force"]
    f_after = segments[i + 1]["mean_force"]
    delta_f = f_after - f_before
    boundary_idx = segments[i]["end_idx"]
    change_time = time[boundary_idx]
    raw_changes.append(
        {
            "change_time_s": change_time,
            "force_before_pN": f_before,
            "force_after_pN": f_after,
            "delta_force_pN": delta_f,
            "direction": "up" if delta_f > 0 else "down",
        }
    )

changes_df = pd.DataFrame(raw_changes)

###########################################################
# 4. POST-PROCESSING STEPS
#
# 4a. Merge consecutive steps with SAME direction if they
#     happen within MERGE_WINDOW seconds.
#
#     Example: down at t=58.63s then down again at t=58.69s
###########################################################

def merge_close_same_direction(df_steps, merge_window):
    if df_steps.empty:
        return df_steps.copy()

    merged = []
    cur = df_steps.iloc[0].to_dict()

    for idx in range(1, len(df_steps)):
        row = df_steps.iloc[idx].to_dict()

        same_dir = (row["direction"] == cur["direction"])
        close_in_time = (row["change_time_s"] - cur["change_time_s"]) <= merge_window

        if same_dir and close_in_time:
            # merge cur and row into a single bigger jump:
            # new before: cur.force_before_pN
            # new after: row.force_after_pN
            # new delta: after - before
            merged_force_before = cur["force_before_pN"]
            merged_force_after  = row["force_after_pN"]
            merged_delta        = merged_force_after - merged_force_before

            cur["force_after_pN"] = merged_force_after
            cur["delta_force_pN"] = merged_delta
            cur["direction"] = "up" if merged_delta > 0 else "down"
            # keep cur["change_time_s"] as first event time
        else:
            merged.append(cur)
            cur = row

    merged.append(cur)
    return pd.DataFrame(merged)

merged_df = merge_close_same_direction(changes_df, MERGE_WINDOW)

###########################################################
# 4b. Remove spike pairs:
#     If we have two consecutive steps with OPPOSITE direction
#     within CANCEL_WINDOW sec, drop both (treat as noise).
#
#     Example: up at 284.302s, down at 284.430s (~0.13s)
###########################################################

def remove_opposite_spikes(df_steps, cancel_window):
    if len(df_steps) <= 1:
        return df_steps.copy()

    to_drop = set()

    for i in range(len(df_steps) - 1):
        t1 = df_steps.iloc[i]["change_time_s"]
        t2 = df_steps.iloc[i+1]["change_time_s"]
        d1 = df_steps.iloc[i]["direction"]
        d2 = df_steps.iloc[i+1]["direction"]

        if d1 != d2 and (t2 - t1) <= cancel_window:
            # mark both i and i+1 to be dropped
            to_drop.add(i)
            to_drop.add(i+1)

    keep_rows = [i for i in range(len(df_steps)) if i not in to_drop]
    return df_steps.iloc[keep_rows].reset_index(drop=True)

clean_df = remove_opposite_spikes(merged_df, CANCEL_WINDOW)

###########################################################
# 5. Apply amplitude filter |Δforce| >= 1 pN
###########################################################

filtered_df = clean_df[np.abs(clean_df["delta_force_pN"]) >= MIN_STEP].copy()

##########################################
# 6. Save all versions to Excel
##########################################

with pd.ExcelWriter(outfile) as writer:
    changes_df.to_excel(writer, sheet_name="raw_steps", index=False)
    merged_df.to_excel(writer, sheet_name="merged_same_dir", index=False)
    clean_df.to_excel(writer, sheet_name="after_spike_removal", index=False)
    filtered_df.to_excel(writer, sheet_name="final_ge_1pN", index=False)

print(f"Done. Wrote: {outfile}")
print("Sheets in Excel:")
print(" - raw_steps              (direct from ruptures)")
print(" - merged_same_dir        (after merging close same-direction jumps)")
print(" - after_spike_removal    (after removing fast opposite blips)")
print(" - final_ge_1pN           (after |Δforce|≥1 pN filter)")

Leave a Reply

Your email address will not be published. Required fields are marked *