Commit 0f33087e authored by Elias's avatar Elias

matching script

parent 05f9e714
......@@ -96,6 +96,14 @@ python3 csv_en_mots.py mots.csv
python3 decouper_wav_mots_uniquement.py annotation file "mot1" "mot2" "mot3" ...
```
## 2. Matching with similarity
### • This script (inspired by [the github of the paper : Spoken-Term Discovery using Discrete Speech Units](https://github.com/bshall/dusted)) assumes you have a dusted folder with the match_2.py script in the folder
**matchQueryHS.py :** Output : csv files for every query with score of matching
```bash
python3 matchQueryHS.py /path/to/your/turn/of/speech/npz /path/to/your/queries/npz /path/to/the/output/folder --W1 1 --W2 1
```
# Tools
### • Counting the number of segments
......
import argparse
from pathlib import Path
from tqdm import tqdm
from functools import partial
from multiprocessing import Pool, Manager, Process
import torch
import numpy as np
import csv
import matplotlib.pyplot as plt
import numba
def print_dtw_terminal(x_tokens, y_tokens, W1, W2, threshold):
from dusted.match_2 import match_rescore
from dusted.match_2 import score
x_tokens = np.array(x_tokens, dtype=np.int64)
y_tokens = np.array(y_tokens, dtype=np.int64)
print(x_tokens)
print(y_tokens)
H, T = score(x_tokens, y_tokens, W1, W2)
Ht = H.T # transpose pour afficher correctement
starts = np.argwhere(H == np.max(H))
start = starts[starts.sum(axis=-1).argmin()]
for path, a, b, sim in match_rescore(x_tokens, y_tokens, W1, W2, threshold):
path_set = {(j, i) for (i, j) in path}
print("\nDTW Matrix (H) :")
header = " " + " ".join(f"{t:4}" for t in y_tokens)
print(header)
for i, x in enumerate(x_tokens):
row = f"{x:4} "
for j, y in enumerate(y_tokens):
val = H[i+1, j+1]
row += f" {val:.2f} "
print(row)
def write_results_to_file(out_path, queue):
out_path.parent.mkdir(exist_ok=True, parents=True)
with open(out_path, "w") as file:
writer = csv.writer(file)
writer.writerow(["file", "t0", "tn", "tokens", "score"])
while True:
result = queue.get()
if result == "DONE":
break
writer.writerows(result)
def process_pair(target_path, match, sim, query_path, W1, W2, threshold=None, min_duration=0.2, visualize=False, out_dir=None):
print(query_path)
print(target_path)
x_segments = np.load(query_path)
y_segments = np.load(target_path)
x_codes, x_boundaries = x_segments["codes"], x_segments["boundaries"]
y_codes, y_boundaries = y_segments["codes"], y_segments["boundaries"]
# si threshold n'est pas donné : utiliser la taille de la query
if threshold is None:
threshold = len(x_codes)
print(f"[INFO] Query={query_path.stem}, threshold={threshold}")
print(x_codes)
print(len(x_codes))
print(len(y_codes))
matches = []
print(threshold)
paths = match(x_codes, y_codes, W1, W2,threshold)
print(paths)
for i, (path, a, b, score_val) in enumerate(paths):
a0, b0 = path[0]
an, bn = path[-1]
a0 = round(x_boundaries[a0 - 1] * 0.02, 2)
an = round(x_boundaries[an] * 0.02, 2)
b0 = round(y_boundaries[b0 - 1] * 0.02, 2)
bn = round(y_boundaries[bn] * 0.02, 2)
atokens = [t for t in a if t != -1]
btokens = [t for t in b if t != -1]
matches.append((query_path.stem, a0, an, " ".join(map(str, atokens)), score_val))
matches.append((target_path.stem, b0, bn, " ".join(map(str, btokens)), score_val))
print(f"\nDTW entre {query_path.stem} et {target_path.stem}: {len(path)} alignements (tokens appariés)")
print_dtw_terminal(atokens, btokens, W1, W2, threshold)
return matches
def do_match(args, query_path, W1_value, W2_value, out_path, visualize=False):
segment_paths = sorted(args.segments_dir.rglob("*.npz"))
from dusted.match_2 import match_rescore as match_fn
print(segment_paths)
all_tokens = []
for seg_path in segment_paths + [query_path]:
seg = np.load(seg_path)
all_tokens.extend(seg["codes"])
max_token = int(np.max(all_tokens))
with Pool(processes=args.processes) as pool, Manager() as manager:
queue = manager.Queue()
writer = Process(target=write_results_to_file, args=(out_path, queue))
writer.start()
match_pair = partial(
process_pair,
match=match_fn,
sim=[],
W1=W1_value,
W2=W2_value,
threshold=None,
min_duration=args.min_duration,
query_path=query_path,
out_dir=args.out_dir,
visualize=visualize
)
for result in tqdm(
pool.imap(match_pair, segment_paths, chunksize=args.chunksize),
total=len(segment_paths),
desc=f"{query_path.stem} | W1={W1_value} W2={W2_value}"
):
if result:
queue.put(result)
queue.put("DONE")
writer.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Matche des queries avec haystack"
)
parser.add_argument("haystack_dir", type=Path, help="Répertoire contenant les fichiers npz du haystack (cibles fixes)")
parser.add_argument("queries_root", type=Path, help="Répertoire contenant des sous-dossiers avec des queries npz")
parser.add_argument("out_dir", type=Path, help="Répertoire de sortie pour les résultats CSV")
parser.add_argument("--W1", type=float, default=1.0, help="gap cost insertion")
parser.add_argument("--W2", type=float, default=1.0, help="gap cost deletion")
parser.add_argument("--threshold", type=float, default=6)
parser.add_argument("--min_duration", type=float, default=0.2)
parser.add_argument("--processes", type=int, default=10)
parser.add_argument("--chunksize", type=int, default=200)
parser.add_argument("--visualize", action="store_true", help="Enregistrer la matrice DTW pour chaque match")
args = parser.parse_args()
args.out_dir.mkdir(parents=True, exist_ok=True)
args.segments_dir = args.haystack_dir
# Parcourir chaque sous-dossier
for subdir in sorted(args.queries_root.iterdir()):
if not subdir.is_dir():
continue
query_files = sorted(subdir.glob("*.npz"))
if not query_files:
continue
print(f"\n Sous-dossier {subdir.name} : {len(query_files)} queries")
used_names = {}
for query_path in query_files:
print("-"*50)
print(query_path)
base_name = query_path.stem
count = used_names.get(base_name, 0)
used_names[base_name] = count + 1
if count == 0:
csv_name = f"{subdir.name}_{base_name}.csv"
else:
csv_name = f"{subdir.name}_{base_name}_{count}.csv"
out_path = args.out_dir / csv_name
print(f" Matching {query_path.name} → {csv_name}")
do_match(
args,
query_path,
W1_value=args.W1,
W2_value=args.W2,
out_path=out_path,
visualize=args.visualize
)
import numba
import numpy as np
@numba.njit()
def match(x, y, sim, W1, W2):
"""Find a similar unit sub-sequence between two utterances.
Args:
x (NDArray): discrete units for the first utterance of shape (N,).
y (NDArray): discrete units for the second utterance of shape (M,).
sim (NDArray): substitution function that returns a score for matching units of shape (K, K) where K is the total number of discrete units.
gap (float): the gap penalty.
Returns:
NDArray[Tuple(int,int)]: list of aligned indices in x and y.
NDArray[int]: matching sub-sequence in x.
NDArray[int]: matching sub-sequence in y.
float: similarity score.
"""
H, T = score(x, y, W1, W2)
print("H", H)
similarity = np.max(H)
starts = np.argwhere(H == similarity)
start = starts[starts.sum(axis=-1).argmin()]
path, a, b = backtrace(T, start, x, y)
return path, a, b, similarity
@numba.njit()
def rescore(H, T, x, y, W1, W2, visited, start):
M = np.copy(H)
scores = np.zeros(4, dtype=np.float32)
istart, jstart = start
jend = jstart
for i in range(istart, M.shape[0]):
jinc = jstart
jmatched = False
for j in range(jstart, M.shape[1]):
if visited[i, j]:
M[i, j] = 0
T[i, j] = 0
continue
scores[1] = M[i - 1, j - 1] + int(1 if x[i-1]==y[j-1] else -1) #sim[x[i - 1], y[j - 1]]
scores[2] = M[i - 1, j] - W1
scores[3] = M[i, j - 1] - W2
k = np.argmax(scores)
M[i, j] = scores[k]
T[i, j] = k
if M[i, j] == H[i, j]:
if j == jinc:
jstart += 1
elif j >= jend:
jmatched = True
jend = j
break
if not jmatched:
jend = M.shape[1] - 1
if jinc == jend:
break
return M, T
@numba.njit()
def match_rescore(
x: np.ndarray, y: np.ndarray, W1: float, W2: float, threshold: float = 6
):
"""Find similar unit sub-sequences between two utterances.
Args:
x (NDArray): discrete units for the first utterance of shape (N,).
y (NDArray): discrete units for the second utterance of shape (M,).
sim (NDArray): substitution function that returns a score for matching units of shape (K, K) where K is the total number of discrete units.
gap (float): the gap penalty.
tau (float): similarity threshold for matches (defaults to 6).
Yields:
NDArray[Tuple(int,int)]: list of aligned indices in x and y.
NDArray[int]: matching sub-sequence in x.
NDArray[int]: matching sub-sequence in y.
float: similarity score.
Notes:
The function finds multiple matches by recomputing the scoring matrix `H` after each match is found.
This allows the discovery of secondary matches that are locally optimal but do not overlap with previously identified matches.
"""
H, T = score(x, y, W1, W2)
print("H rescore : ",H)
print("T rescore : ", T)
visited = np.zeros_like(H, dtype=np.bool_)
while True:
similarity = np.max(H)
if similarity < threshold:
break
starts = np.argwhere(H == similarity)
start = starts[starts.sum(axis=-1).argmin()]
path, a, b = backtrace(T, start, x, y)
yield path, a, b, similarity
for i, j in path:
visited[i, j] = True
H, T = rescore(H, T, x, y, W1, W2, visited, path[0])
similarity = np.max(H)
@numba.njit()
def score(x, y, W1, W2):
n, m = len(x), len(y)
H = np.zeros((n + 1, m + 1), dtype=np.float32)
T = np.full((n + 1, m + 1), 0, dtype=np.int16)
print("x", x, ", y ", y)
scores = np.zeros(4, dtype=np.float32)
for i in range(1, n + 1):
for j in range(1, m + 1):
scores[1] = H[i - 1, j - 1] + int(1 if x[i-1]==y[j-1] else -1) #sim[x[i - 1], y[j - 1]]
scores[2] = H[i - 1, j] - W1
scores[3] = H[i, j - 1] - W2
k = np.argmax(scores)
H[i, j] = scores[k]
T[i, j] = k
return H, T
@numba.njit()
def backtrace(T, start, x, y, blank=-1):
i, j = start
path = []
a = []
b = []
while T[i, j] != 0 and (i > 0 or j > 0):
path.append((i, j))
if T[i, j] == 1: # substitution
i -= 1
j -= 1
a.append(x[i])
b.append(y[j])
elif T[i, j] == 2: # deletion
i -= 1
a.append(x[i])
b.append(blank)
elif T[i, j] == 3: # insertion
j -= 1
a.append(blank)
b.append(y[j])
path.reverse()
a.reverse()
b.reverse()
return path, a, b
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment