Skip to content
Snippets Groups Projects
Commit c1bd0f7c authored by Guan, Yiying's avatar Guan, Yiying
Browse files

Upload New File

parent 622c9ea6
No related branches found
No related tags found
No related merge requests found
import pandas as pd
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizer, BertModel
import torch
from PIL import Image
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased').to('cuda')
def calculate_i2t_relevance(device, texts, image_path, batch_size=32):
clip_model_id = "openai/clip-vit-base-patch32"
clip_processor = CLIPProcessor.from_pretrained(clip_model_id)
clip_model = CLIPModel.from_pretrained(clip_model_id).to(device)
image = image_path
clip_model.eval()
score_list = []
# Process texts in batches
with torch.no_grad():
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
images = [image] * len(batch_texts)
inputs = clip_processor(text=batch_texts, images=image, return_tensors="pt", padding=True, truncation=True, max_length=77).to(device)
outputs = clip_model(**inputs)
logits_per_image = outputs.logits_per_image
scores = logits_per_image.detach().cpu().numpy()
score_list.extend(scores[0])
return score_list
# def get_bert_embeddings(texts, device):
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# bert_model = BertModel.from_pretrained('bert-base-uncased').to(device)
# bert_model.eval()
# with torch.no_grad():
# inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)
# outputs = bert_model(**inputs)
# embeddings = outputs.last_hidden_state.mean(dim=1).detach().cpu().numpy()
# return embeddings
#
# from transformers import BertTokenizer, BertModel
# import torch
def get_bert_embeddings(texts, device, batch_size=16):
bert_model.eval()
embeddings = []
with torch.no_grad():
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
inputs = tokenizer(batch_texts, return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)
outputs = bert_model(**inputs)
batch_embeddings = outputs.last_hidden_state.mean(dim=1).detach().cpu().numpy()
embeddings.extend(batch_embeddings)
return np.array(embeddings)
def filter_sentences(df, col_name='relevance', k_top=5, k_bottom=5):
df_sorted = df.sort_values(by=col_name, ascending=False)
top_indices = df_sorted.head(k_top).index
bottom_indices = df_sorted.tail(k_bottom).index
selected_indices = top_indices.append(bottom_indices)
filtered_sentences = df.loc[selected_indices].sort_index()
combined_sentences = filtered_sentences['text'].tolist()
return combined_sentences
def train_pic_relevance(device, ad_indices, hc_indices, combined_data, combined_original, svc_kernel='linear',
iterations=600, ifaug=False, ifprompt=False, raw_sent=None, raw_original=None):
iteration_results = []
for iteration in range(iterations):
# Randomly select 60 samples for training and 15 samples for testing from AD and HC
train_ad_indices = np.random.choice(ad_indices, 60, replace=False)
train_hc_indices = np.random.choice(hc_indices, 60, replace=False)
test_ad_indices = np.random.choice(list(set(ad_indices) - set(train_ad_indices)), 15, replace=False)
test_hc_indices = np.random.choice(list(set(hc_indices) - set(train_hc_indices)), 15, replace=False)
train_indices = np.concatenate([train_ad_indices, train_hc_indices])
test_indices = np.concatenate([test_ad_indices, test_hc_indices])
if ifaug == False:
train_original = combined_original.loc[train_indices].sort_index()
test_original = combined_original.loc[test_indices].sort_index()
# Filter corresponding sentences in combined_data based on train and test indices
train_sent_data = combined_data.loc[combined_data['id'].isin(train_original['id'])]
test_sent_data = combined_data.loc[combined_data['id'].isin(test_original['id'])]
elif ifaug == True:
if ifprompt == False:
# Selecting original data based on train_indices and test_indices
train_original = combined_original.loc[combined_original['id_original'].isin(train_indices)].sort_index()
test_original = combined_original.loc[combined_original['id_original'].isin(test_indices)].sort_index()
# Filtering to ensure test data contains only 'text' type rows
test_original = test_original[test_original['text_type'] == 'text']
# Filter corresponding sentences in combined_data based on train and test original IDs
train_sent_data = combined_data.loc[combined_data['id'].isin(train_original['id'])]
test_sent_data = combined_data.loc[combined_data['id'].isin(test_original['id']) & (combined_data['text_type'] == 'text')]
elif ifprompt == True:
# Selecting original data based on train_indices and test_indices
train_original = combined_original.loc[combined_original['id_original'].isin(train_indices)].sort_index()
test_original = raw_original.loc[raw_original['id_original'].isin(test_indices)].sort_index()
# Filtering to ensure test data contains only 'text' type rows
test_original = test_original[test_original['text_type'] == 'text']
# Filter corresponding sentences in combined_data based on train and test original IDs
train_sent_data = combined_data.loc[combined_data['id'].isin(train_original['id'])]
test_sent_data = raw_sent.loc[raw_sent['id'].isin(test_original['id']) & (raw_sent['text_type'] == 'text')]
for k_top in range(0, 11):
for k_bottom in range(0, 11):
if k_top == 0 and k_bottom == 0:
continue
accuracies = []
recalls = []
precisions = []
f1s = []
# Filter training and testing texts
train_filtered_text = train_sent_data.groupby('id').apply(
lambda x: ' '.join(filter_sentences(x, k_top=k_top, k_bottom=k_bottom))).reset_index(level=0,
drop=True)
test_filtered_text = test_sent_data.groupby('id').apply(
lambda x: ' '.join(filter_sentences(x, k_top=k_top, k_bottom=k_bottom))).reset_index(level=0,
drop=True)
# Merge filtered texts with original data
train_original['filtered_text'] = list(train_filtered_text)
test_original['filtered_text'] = list(test_filtered_text)
# Encode labels
label_encoder = LabelEncoder()
train_labels = label_encoder.fit_transform(train_original['intent'])
test_labels = label_encoder.transform(test_original['intent'])
# Get BERT embeddings
train_embeddings = get_bert_embeddings(train_original['filtered_text'].tolist(), device)
test_embeddings = get_bert_embeddings(test_original['filtered_text'].tolist(), device)
# Train classifier
classifier = SVC(kernel=svc_kernel)
classifier.fit(train_embeddings, train_labels)
# Predict and evaluate
test_predictions = classifier.predict(test_embeddings)
accuracy = accuracy_score(test_labels, test_predictions)
recall = recall_score(test_labels, test_predictions, average='weighted')
precision = precision_score(test_labels, test_predictions, average='weighted')
f1 = f1_score(test_labels, test_predictions, average='weighted')
conf_matrix = confusion_matrix(test_labels, test_predictions).tolist()
print(
f"Iteration {iteration + 1}, k_top = {k_top}, k_bottom = {k_bottom}, Test Accuracy: {accuracy * 100:.2f}%, Recall: {recall * 100:.2f}%, Precision: {precision * 100:.2f}%, F1: {f1 * 100:.2f}%")
iteration_results.append({
'iteration': iteration + 1,
'k_top': k_top,
'k_bottom': k_bottom,
'Test Accuracy': accuracy * 100,
'Recall': recall * 100,
'Precision': precision * 100,
'F1 Score': f1 * 100,
'Confusion Matrix': conf_matrix
})
# Save performance metrics
accuracies.append(accuracy)
recalls.append(recall)
precisions.append(precision)
f1s.append(f1)
del train_embeddings
del test_embeddings
torch.cuda.empty_cache()
return iteration_results
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment