The script loads raw force-vs-time data from a CSV file, automatically identifies step-like changes in the force signal, and exports a cleaned list of meaningful force transitions to Excel. It first detects changepoints in the force trace using a PELT-based algorithm from the ruptures library, which segments the signal into plateaus and finds the boundaries between them. Each boundary is treated as a candidate step, with its timestamp, the average force before and after, and the delta force (direction up or down). After detection, the script applies two biologically motivated cleanup rules: (1) it merges consecutive steps in the same direction that occur within a short time window, treating them as a single physical event instead of multiple tiny jumps; and (2) it removes fast “spike” pairs where the force briefly jumps up and then down (or down then up) within a very short interval, which are likely noise rather than true mechanical steps. Finally, it filters for steps with an absolute amplitude ≥ 1 pN and writes several worksheets to an Excel file, including the raw detection, intermediate cleanup stages, and the final curated step list. This makes it easy to batch-process kymograph force traces and consistently quantify discrete force changes.
# ------------------------------------------------------------
# Requirements:
# pip install pandas numpy ruptures openpyxl
# ------------------------------------------------------------
import pandas as pd
import numpy as np
import ruptures as rpt
import os
########################
# 1. User I/O settings #
########################
infile = "./ForceHFCorrectedForce2x_p853_p502_b3_1_downsampled1000_HF_HF.csv"
outfile = "force_steps.xlsx"
MIN_STEP = 1.0 # pN threshold to report
FALLBACK_FS = 78.1 # Hz, if no time column
PENALTY = 5 # ruptures sensitivity
# post-processing params:
MERGE_WINDOW = 0.10 # sec: merge same-direction steps < this apart
CANCEL_WINDOW = 0.20 # sec: remove opposite-direction spike pairs < this apart
#####################################
# 2. Load data and pick time/force #
#####################################
df = pd.read_csv(infile)
# Guess force column
force_col_candidates = [
"Corrected Force 2x",
"Corrected Force",
"Force",
"Force HF",
"force",
"force_pN",
]
force_col = None
for c in force_col_candidates:
if c in df.columns:
force_col = c
break
if force_col is None:
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) == 0:
raise ValueError("No numeric force column found in CSV.")
force_col = numeric_cols[-1]
# Guess time column
time_col_candidates = [
"Time",
"time",
"t",
"timestamp",
"Timestamp (s)",
"Seconds",
]
time_col = None
for c in time_col_candidates:
if c in df.columns:
time_col = c
break
if time_col is None:
n = len(df)
dt = 1.0 / FALLBACK_FS
df["__time_s"] = np.arange(n) * dt
time_col = "__time_s"
time = df[time_col].to_numpy()
force = df[force_col].to_numpy()
###########################################
# 3. Changepoint detection (ruptures PELT)#
###########################################
algo = rpt.Pelt(model="l2").fit(force)
bkpts = algo.predict(pen=PENALTY)
# bkpts are 1-based segment ends
seg_starts = [0] + [b for b in bkpts[:-1]]
seg_ends = [b for b in bkpts] # 1-based inclusive ends
segments = []
for start, end in zip(seg_starts, seg_ends):
seg_force = force[start:end]
seg_time = time[start:end]
mean_force = np.mean(seg_force)
mid_idx = start + (len(seg_force) // 2)
mid_time = time[mid_idx]
segments.append(
{
"start_idx": start,
"end_idx": end - 1,
"mean_force": mean_force,
"mid_time": mid_time,
}
)
# Build raw step list
raw_changes = []
for i in range(len(segments) - 1):
f_before = segments[i]["mean_force"]
f_after = segments[i + 1]["mean_force"]
delta_f = f_after - f_before
boundary_idx = segments[i]["end_idx"]
change_time = time[boundary_idx]
raw_changes.append(
{
"change_time_s": change_time,
"force_before_pN": f_before,
"force_after_pN": f_after,
"delta_force_pN": delta_f,
"direction": "up" if delta_f > 0 else "down",
}
)
changes_df = pd.DataFrame(raw_changes)
###########################################################
# 4. POST-PROCESSING STEPS
#
# 4a. Merge consecutive steps with SAME direction if they
# happen within MERGE_WINDOW seconds.
#
# Example: down at t=58.63s then down again at t=58.69s
###########################################################
def merge_close_same_direction(df_steps, merge_window):
if df_steps.empty:
return df_steps.copy()
merged = []
cur = df_steps.iloc[0].to_dict()
for idx in range(1, len(df_steps)):
row = df_steps.iloc[idx].to_dict()
same_dir = (row["direction"] == cur["direction"])
close_in_time = (row["change_time_s"] - cur["change_time_s"]) <= merge_window
if same_dir and close_in_time:
# merge cur and row into a single bigger jump:
# new before: cur.force_before_pN
# new after: row.force_after_pN
# new delta: after - before
merged_force_before = cur["force_before_pN"]
merged_force_after = row["force_after_pN"]
merged_delta = merged_force_after - merged_force_before
cur["force_after_pN"] = merged_force_after
cur["delta_force_pN"] = merged_delta
cur["direction"] = "up" if merged_delta > 0 else "down"
# keep cur["change_time_s"] as first event time
else:
merged.append(cur)
cur = row
merged.append(cur)
return pd.DataFrame(merged)
merged_df = merge_close_same_direction(changes_df, MERGE_WINDOW)
###########################################################
# 4b. Remove spike pairs:
# If we have two consecutive steps with OPPOSITE direction
# within CANCEL_WINDOW sec, drop both (treat as noise).
#
# Example: up at 284.302s, down at 284.430s (~0.13s)
###########################################################
def remove_opposite_spikes(df_steps, cancel_window):
if len(df_steps) <= 1:
return df_steps.copy()
to_drop = set()
for i in range(len(df_steps) - 1):
t1 = df_steps.iloc[i]["change_time_s"]
t2 = df_steps.iloc[i+1]["change_time_s"]
d1 = df_steps.iloc[i]["direction"]
d2 = df_steps.iloc[i+1]["direction"]
if d1 != d2 and (t2 - t1) <= cancel_window:
# mark both i and i+1 to be dropped
to_drop.add(i)
to_drop.add(i+1)
keep_rows = [i for i in range(len(df_steps)) if i not in to_drop]
return df_steps.iloc[keep_rows].reset_index(drop=True)
clean_df = remove_opposite_spikes(merged_df, CANCEL_WINDOW)
###########################################################
# 5. Apply amplitude filter |Δforce| >= 1 pN
###########################################################
filtered_df = clean_df[np.abs(clean_df["delta_force_pN"]) >= MIN_STEP].copy()
##########################################
# 6. Save all versions to Excel
##########################################
with pd.ExcelWriter(outfile) as writer:
changes_df.to_excel(writer, sheet_name="raw_steps", index=False)
merged_df.to_excel(writer, sheet_name="merged_same_dir", index=False)
clean_df.to_excel(writer, sheet_name="after_spike_removal", index=False)
filtered_df.to_excel(writer, sheet_name="final_ge_1pN", index=False)
print(f"Done. Wrote: {outfile}")
print("Sheets in Excel:")
print(" - raw_steps (direct from ruptures)")
print(" - merged_same_dir (after merging close same-direction jumps)")
print(" - after_spike_removal (after removing fast opposite blips)")
print(" - final_ge_1pN (after |Δforce|≥1 pN filter)")