Detect jump with ruptures on Data_Vero_Kymographs force data

# ------------------------------------------------------------
# Requirements:
# pip install pandas numpy ruptures openpyxl
# ------------------------------------------------------------

import pandas as pd
import numpy as np
import ruptures as rpt
import os

########################
# 1. User I/O settings #
########################

infile = "./ForceHFCorrectedForce2x_p853_p502_b3_1_downsampled1000_HF_HF.csv"
outfile = "force_steps.xlsx"

MIN_STEP = 0.4          # pN threshold to report
FALLBACK_FS = 78.1      # Hz, if no time column
PENALTY = 5             # ruptures sensitivity

# post-processing params:
MERGE_WINDOW = 0.10     # sec: merge same-direction steps < this apart
CANCEL_WINDOW = 0.20    # sec: remove opposite-direction spike pairs < this apart

#####################################
# 2. Load data and pick time/force  #
#####################################

df = pd.read_csv(infile)

# Guess force column
force_col_candidates = [
        "Corrected Force 2x",
        "Corrected Force",
        "Force",
        "Force HF",
        "force",
        "force_pN",
]
force_col = None
for c in force_col_candidates:
        if c in df.columns:
                force_col = c
                break
if force_col is None:
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) == 0:
                raise ValueError("No numeric force column found in CSV.")
        force_col = numeric_cols[-1]

# Guess time column
time_col_candidates = [
        "Time",
        "time",
        "t",
        "timestamp",
        "Timestamp (s)",
        "Seconds",
]
time_col = None
for c in time_col_candidates:
        if c in df.columns:
                time_col = c
                break
if time_col is None:
        n = len(df)
        dt = 1.0 / FALLBACK_FS
        df["__time_s"] = np.arange(n) * dt
        time_col = "__time_s"

time = df[time_col].to_numpy()
force = df[force_col].to_numpy()

###########################################
# 3. Changepoint detection (ruptures PELT)#
###########################################

algo = rpt.Pelt(model="l2").fit(force)
bkpts = algo.predict(pen=PENALTY)
# bkpts are 1-based segment ends

seg_starts = [0] + [b for b in bkpts[:-1]]
seg_ends   = [b for b in bkpts]  # 1-based inclusive ends

segments = []
for start, end in zip(seg_starts, seg_ends):
        seg_force = force[start:end]
        seg_time = time[start:end]
        mean_force = np.mean(seg_force)
        mid_idx = start + (len(seg_force) // 2)
        mid_time = time[mid_idx]
        segments.append(
                {
                        "start_idx": start,
                        "end_idx": end - 1,
                        "mean_force": mean_force,
                        "mid_time": mid_time,
                }
        )

# Build raw step list
raw_changes = []
for i in range(len(segments) - 1):
        f_before = segments[i]["mean_force"]
        f_after = segments[i + 1]["mean_force"]
        delta_f = f_after - f_before
        boundary_idx = segments[i]["end_idx"]
        change_time = time[boundary_idx]
        raw_changes.append(
                {
                        "change_time_s": change_time,
                        "force_before_pN": f_before,
                        "force_after_pN": f_after,
                        "delta_force_pN": delta_f,
                        "direction": "up" if delta_f > 0 else "down",
                }
        )

changes_df = pd.DataFrame(raw_changes)

###########################################################
# 4. POST-PROCESSING STEPS
#
# 4a. Merge consecutive steps with SAME direction if they
#     happen within MERGE_WINDOW seconds.
#
#     Example: down at t=58.63s then down again at t=58.69s
###########################################################

def merge_close_same_direction(df_steps, merge_window):
        if df_steps.empty:
                return df_steps.copy()

        merged = []
        cur = df_steps.iloc[0].to_dict()

        for idx in range(1, len(df_steps)):
                row = df_steps.iloc[idx].to_dict()

                same_dir = (row["direction"] == cur["direction"])
                close_in_time = (row["change_time_s"] - cur["change_time_s"]) <= merge_window

                if same_dir and close_in_time:
                        # merge cur and row into a single bigger jump:
                        # new before: cur.force_before_pN
                        # new after: row.force_after_pN
                        # new delta: after - before
                        merged_force_before = cur["force_before_pN"]
                        merged_force_after  = row["force_after_pN"]
                        merged_delta        = merged_force_after - merged_force_before

                        cur["force_after_pN"] = merged_force_after
                        cur["delta_force_pN"] = merged_delta
                        cur["direction"] = "up" if merged_delta > 0 else "down"
                        # keep cur["change_time_s"] as first event time
                else:
                        merged.append(cur)
                        cur = row

        merged.append(cur)
        return pd.DataFrame(merged)

merged_df = merge_close_same_direction(changes_df, MERGE_WINDOW)

###########################################################
# 4b. Remove spike pairs:
#     If we have two consecutive steps with OPPOSITE direction
#     within CANCEL_WINDOW sec, drop both (treat as noise).
#
#     Example: up at 284.302s, down at 284.430s (~0.13s)
###########################################################

def remove_opposite_spikes(df_steps, cancel_window):
        if len(df_steps) <= 1:
                return df_steps.copy()

        to_drop = set()

        for i in range(len(df_steps) - 1):
                t1 = df_steps.iloc[i]["change_time_s"]
                t2 = df_steps.iloc[i+1]["change_time_s"]
                d1 = df_steps.iloc[i]["direction"]
                d2 = df_steps.iloc[i+1]["direction"]

                if d1 != d2 and (t2 - t1) <= cancel_window:
                        # mark both i and i+1 to be dropped
                        to_drop.add(i)
                        to_drop.add(i+1)

        keep_rows = [i for i in range(len(df_steps)) if i not in to_drop]
        return df_steps.iloc[keep_rows].reset_index(drop=True)

clean_df = remove_opposite_spikes(merged_df, CANCEL_WINDOW)

###########################################################
# 5. Apply amplitude filter |Δforce| >= 1 pN
###########################################################

filtered_df = clean_df[np.abs(clean_df["delta_force_pN"]) >= MIN_STEP].copy()

##########################################
# 6. Save all versions to Excel
##########################################

# =========================
# Helpers
# =========================

def round_df(d: pd.DataFrame, nd=3) -> pd.DataFrame:
        d = d.copy()
        float_cols = d.select_dtypes(include=[np.floating]).columns
        if len(float_cols):
                d[float_cols] = d[float_cols].round(nd)
        return d

with pd.ExcelWriter(outfile) as writer:
        changes_df.to_excel(writer, sheet_name="raw_steps", index=False)
        merged_df.to_excel(writer, sheet_name="merged_same_dir", index=False)
        clean_df.to_excel(writer, sheet_name="after_spike_removal", index=False)
        round_df(filtered_df).to_excel(writer, sheet_name="final_ge_1pN", index=False)

print(f"Done. Wrote: {outfile}")
print("Sheets in Excel:")
print(" - raw_steps              (direct from ruptures)")
print(" - merged_same_dir        (after merging close same-direction jumps)")
print(" - after_spike_removal    (after removing fast opposite blips)")
print(" - final_ge_1pN           (after |Δforce|≥1 pN filter)")

Leave a Reply

Your email address will not be published. Required fields are marked *