Commit c516942e authored by Joel Oksanen's avatar Joel Oksanen

Cleaned up code

parent 21fae9ef
......@@ -4,7 +4,6 @@ __pycache__/
server/agent/amazon_data/
server/agent/SA/data/
server/agent/target_extraction/data/
server/agent/target_extraction/stanford-corenlp-full-2018-10-05
server/agent/target_extraction/BERT/data/
.DS_Store
*.pickle
......
This diff is collapsed.
class FeatureCounter:
def __init__(self):
self.dep_features = {} # init as empty
self.n_dep_features = 0
def indexof(self, feature, learning):
if feature in self.dep_features:
return self.dep_features[feature]
elif learning:
self.dep_features[feature] = self.n_dep_features
self.n_dep_features += 1
return self.dep_features[feature]
else:
return None
def count(self):
return self.n_dep_features
from nltk.tree import ParentedTree as Tree
class Instance:
def __init__(self, xml):
self.text = xml.find('text').text
self.opinion = xml.find('opinion').text
self.tree = Tree.fromstring(xml.find('tree').text)
import xml.etree.ElementTree as ET
from nltk.tree import ParentedTree as Tree
from sklearn import svm
from vectorizer import Vectorizer
from gensim.test.utils import get_tmpfile
from gensim.models import Word2Vec
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from sklearn.metrics import confusion_matrix
from instance import Instance
from sklearn.feature_extraction.text import CountVectorizer
import os
import math
def resample_data(instances, labels):
label_instances = {label: [instance for instance in instances if instance.opinion == label] for label in labels}
max_n_instances = max([len(v) for v in label_instances.values()])
resampled_data = []
for label in labels:
m = math.ceil(max_n_instances / len(label_instances[label]))
label_instances[label] = (label_instances[label] * m)[:max_n_instances]
resampled_data += label_instances[label]
print(len(resampled_data))
return resampled_data
class SentimentAnalyzer:
expr_clf = svm.SVC() # determines if sentence expresses sentiment towards ARG
def train_expr_clf(self, instances):
fvs = [instance.vector for instance in instances]
targets = [instance.opinion for instance in instances]
self.expr_clf.fit(fvs, targets)
def get_feature_vector(self, instance):
return FeatureVector(instance, None)
# in: sentence parse tree with labelled argument ARG
# out: true if sentence expresses sentiment towards ARG else false
def expresses_sentiment(self, instances):
return self.expr_clf.predict([instance.vector for instance in instances])
semeval_2014_train_path = 'server/agent/SA/data/SemEval-2014/SemEval_2014_Laptop_Train_with_labelled_parse_trees.xml'
semeval_2014_test_path = 'server/agent/SA/data/SemEval-2014/SemEval_2014_Laptop_Test_with_labelled_parse_trees.xml'
amazon_train_path = 'data/Amazon/amazon_camera_train.xml'
amazon_test_path = 'data/Amazon/amazon_camera_test2.xml' # 'data/Amazon/prepared_amazon_camera_reviews.xml'
semeval_train_path = 'server/agent/SA/data/SemEval-2016/ABSA16_Laptops_Train_SB1_v2_with_labelled_parse_trees.xml'
semeval_test_path = 'server/agent/SA/data/SemEval-2016/ABSA16_Laptops_Test_with_labelled_parse_trees.xml' #
tweet_train_path = 'server/agent/SA/data/acl-14-short-data/tweet_train_with_labelled_parse_trees.xml'
tweet_test_path = 'server/agent/SA/data/acl-14-short-data/tweet_test_with_labelled_parse_trees.xml'
train_path = semeval_2014_train_path
test_path = semeval_2014_test_path
labels = ['positive', 'neutral', 'negative', 'conflict']
sa = SentimentAnalyzer()
train_tree = ET.parse(train_path)
train_instances = [Instance(instance) for instance in train_tree.getroot()]
# train_instances = resample_data(train_instances, labels)
# create and train vectorizer model
vec = Vectorizer(train_instances)
# train classifier for sentiment expression
sa.train_expr_clf(train_instances)
test_tree = ET.parse(test_path)
test_instances = [Instance(instance) for instance in test_tree.getroot()]
# obtain feature vectors and targets for test set
vec.vectorize(test_instances)
# predict test set values
pred = sa.expresses_sentiment(test_instances)
targets = [instance.opinion for instance in test_instances]
# evaluate results
cm = confusion_matrix(targets, pred, labels=labels)
acc = len([i for i in range(len(targets)) if targets[i] == pred[i]]) / len(targets)
print(cm)
print('accuracy:', acc)
......@@ -13,7 +13,7 @@ from sty import fg, bg, ef, rs
from wcwidth import wcswidth
data_location = 'data/reviews/5_products_reviews.tsv'
selected_reviews_location = 'product_reviews_to_be_annotated.xml'
selected_reviews_location = 'reviews_to_be_annotated.xml'
min_characters = 0
max_characters = 200
n = 500
......@@ -23,7 +23,7 @@ ann_fgs = {'positive': fg.green, 'neutral': fg.blue, 'negative': fg.red, 'confli
annotated_reviews_location = 'annotated_camera_reviews.xml'
included_labels = ['NN', 'NNS', 'NP', 'NNP', 'NNPS', 'DT', 'CD', 'FW', 'PRP$']
nouns = ['NN', 'NNS', 'NP', 'NNP', 'NNPS']
prepared_reviews_location = 'annotated_5_products_reviews_2.xml'
prepared_reviews_location = 'annotated_reviews.xml'
tokenizer = TweetTokenizer()
sent_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
......
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer, TfidfTransformer
import numpy as np
import pandas as pd
from feature_counter import FeatureCounter
from nltk.tree import ParentedTree as Tree
from nltk.stem import PorterStemmer
class Vectorizer:
sentiment_lexicon = pd.read_csv('server/agent/SA/data/SocialSent/2000.tsv', index_col=0, header=None, names=['mean', 'std'], sep='\t', error_bad_lines=False)
negations = ['not', 'no', 'never', 'n\'t', 'neither', 'seldom', 'hardly']
copulas = ['is', '\'s', 'was', 'were']
adjetives_and_nouns = ['JJ', 'JJR', 'JJS', 'ADJP', 'NN', 'NNS', 'NP']
ps = PorterStemmer()
def __init__(self, train_instances):
self.transformer = TfidfTransformer()
# indep features:
self.bow_vectorizer = CountVectorizer(stop_words='english', ngram_range=(1,5))
texts = [instance.text for instance in train_instances]
train_bow_vectors = self.bow_vectorizer.fit_transform(texts).toarray()
train_sent_vectors = [self.sentiment_scores(instance) for instance in train_instances]
train_indep_vectors = np.concatenate((train_bow_vectors, train_sent_vectors), axis=1)
# dep features:
self.fc = FeatureCounter()
train_dep_vectors = self.get_dep_vectors(train_instances, learning=True)
# store vectors for training set:
train_vectors = np.concatenate((train_indep_vectors, train_dep_vectors), axis=1)
train_vectors = self.transformer.fit_transform(train_vectors).toarray()
for i in range(len(train_instances)):
train_instances[i].vector = train_vectors[i]
# print length of feature vector
print(len(train_vectors[0]))
# used in vectorizing test set
def vectorize(self, instances):
# indep features:
texts = [instance.text for instance in instances]
bow_vectors = self.bow_vectorizer.transform(texts).toarray()
sent_vectors = [self.sentiment_scores(instance) for instance in instances]
indep_vectors = np.concatenate((bow_vectors, sent_vectors), axis=1)
# dep features:
dep_vectors = self.get_dep_vectors(instances, learning=False)
# store vectors:
vectors = np.concatenate((indep_vectors, dep_vectors), axis=1)
vectors = self.transformer.fit_transform(vectors).toarray()
for i in range(len(instances)):
instances[i].vector = vectors[i]
def sentiment_scores(self, instance):
tokens = instance.text.split(' ')
pos = 0
neg = 0
for token in tokens:
if token in self.sentiment_lexicon.index:
mean, std = self.sentiment_lexicon.loc[token]
if mean < 0:
neg += abs(mean)
else:
pos += abs(mean)
return [pos / len(tokens), neg / len(tokens)]
def get_dep_vectors(self, instances, learning):
feature_indices = []
for instance in instances:
features = self.get_dep_features(instance)
feature_indices.append(set([self.fc.indexof(feature, learning) for feature in features]))
# fill tails with zeros until all vectors are of same length fc.count()
dep_vectors = np.zeros((len(instances), self.fc.count() + 1))
print(dep_vectors.shape)
for i in range(len(instances)):
for feature_index in feature_indices[i]:
if feature_index:
dep_vectors[i][feature_index] = 1
dep_vectors[i][len(dep_vectors[i])-1] = 1
return dep_vectors
def is_verb(self, t):
return (type(t) is Tree and
t.label() in ['VB', 'VBP', 'VBD', 'VBZ'] and
not self.is_copula(t))
def is_copula(self, t):
return (type(t) is Tree and
t.label() in ['VBD', 'VBZ'] and
t.leaves()[0] in self.copulas)
def has_arg(self, t):
return type(t) is Tree and t.label() == 'NP' and 'ARG' in t.leaves()
def is_arg_phrase(self, t):
return type(t) is Tree and t.label() == 'NP' and 'ARG' in t
def is_negation(self, t):
return type(t) is Tree and len(t.leaves()) == 1 and t.leaves()[0] in self.negations
def is_adjective(self, t):
return type(t) is Tree and t.label() in ['JJ', 'JJR', 'JJS', 'ADJP']
def is_particle(self, t):
return type(t) is Tree and t.label() == 'PRT'
def is_noun(self, t):
return type(t) is Tree and t.label() in ['NN', 'NP']
def is_adverb(self, t):
return (type(t) is Tree and t.label() in ['ADVP', 'RB', 'RBR', 'RBS'] and
not any(w.lower() in self.negations for w in t.leaves()))
def is_copula_phrase(self, t):
return type(t) is Tree and t.label() == 'VP' and self.is_copula(t[0])
def preceded_by_adverb(self, t):
p = t.left_sibling()
return p and p.label() in ['RB', 'RBR', 'RBS']
def has_noun(self, t):
return any(t.label() == 'NP' for t in t.subtrees())
def left_sibling_negation(self, t):
n = ''
left_sibling = t.left_sibling()
while left_sibling:
if self.is_negation(left_sibling):
n = 'neg-' + n
left_sibling = left_sibling.left_sibling()
return n
def sibling_negation(self, t):
n = ''
left_sibling = t.left_sibling()
while left_sibling:
if self.is_negation(left_sibling):
n = 'neg-' + n
left_sibling = left_sibling.left_sibling()
right_sibling = t.right_sibling()
while right_sibling:
if self.is_negation(right_sibling):
n = 'neg-' + n
right_sibling = right_sibling.right_sibling()
return n
# returns tree where possible sub-phrases of t have been removed
def main_phrase(self, t):
children = []
for n in t:
if type(n) is Tree and n.label() != 'SBAR':
children.append(self.main_phrase(n))
else:
return n
return Tree(t.label(), children)
# returns left-most node of t in ['ADVP', 'ADJP', 'NP']
def first_descriptive_phrase(self, t):
for n in t:
if n.label() in ['ADVP', 'ADJP', 'NP']:
return n
return None
def get_descriptive_nodes(self, t):
l = []
for n in t:
if type(n) is Tree:
if n.label() in self.adjetives_and_nouns:
l.append(n)
else:
l += self.get_descriptive_nodes(n)
return l
def get_dep_features(self, instance):
features = []
for subtree in instance.tree.subtrees():
if subtree.label() == 'VP':
for i in range(len(subtree)-1):
if (self.is_verb(subtree[i]) and
(self.has_arg(subtree[i+1]) or
(i + 2 < len(subtree) and self.is_particle(subtree[i+1]) and self.has_arg(subtree[i+2])))):
# rule 1: transitive verb with ARG as its object
# modified with inclusion of particles before and after ARG
arg_pos = i + 1
f = self.left_sibling_negation(subtree)
f = f + self.ps.stem(subtree[i].leaves()[0].lower())
if self.is_particle(subtree[i+1]):
f = f + '-' + subtree[i+1].leaves()[0].lower()
arg_pos = i + 2
if arg_pos + 1 < len(subtree) and self.is_particle(subtree[arg_pos+1]):
f = f + '-' + subtree[arg_pos+1].leaves()[0].lower()
f = f + '_arg2'
features.append(f)
for i in range(len(subtree)-1):
if self.has_arg(subtree[i]) and subtree[i+1].label() == 'VP':
for j in range(len(subtree[i+1])):
if self.is_verb(subtree[i+1][j]):
if self.has_noun(subtree[i+1]):
# rule 2: transitive verb with ARG as its subject
f = self.sibling_negation(subtree[i+1][j]) + self.ps.stem(subtree[i+1][j].leaves()[0].lower()) + '_arg1'
features.append(f)
else:
# rule 3: intransitive verb with ARG as its subject
f = self.sibling_negation(subtree[i+1][j]) + self.ps.stem(subtree[i+1][j].leaves()[0].lower()) + '_it_arg1'
features.append(f)
if j + 1 < len(subtree[i+1]) and self.is_adverb(subtree[i+1][j+1]):
# rule 7: adverb modifies verb with ARG as subject
main = self.main_phrase(subtree[i+1][j+1])
f = 'arg1_v_' + self.sibling_negation(subtree[i+1][j+1]) + self.ps.stem('-'.join(main.leaves()).lower())
features.append(f)
if self.is_arg_phrase(subtree):
for i in range(len(subtree)-1):
if self.is_adjective(subtree[i]):
# rule 4.1: adjective with ARG as its head
f = self.ps.stem('-'.join(subtree[i].leaves()).lower()) + '_arg1'
if self.preceded_by_adverb(subtree[i]):
# addition to rules: include preceding adverb
f = subtree[i-1].leaves()[0] + '-' + f
f = self.left_sibling_negation(subtree[i]) + f
features.append(f)
if self.is_noun(subtree[i]) and subtree[i] == 'ARG':
# rule 4.2: noun with ARG as its head
f = self.left_sibling_negation(subtree[i]) + self.ps.stem('-'.join(subtree[i].leaves()).lower()) + '_arg1'
features.append(f)
if self.is_copula_phrase(subtree.right_sibling()):
p = self.first_descriptive_phrase(subtree.right_sibling())
if p:
for n in self.get_descriptive_nodes(p):
# rule 5: adjective or noun connected by a copula with ARG
# extended to include adverb phrases, as these tend to include an adjective
f = self.sibling_negation(subtree.right_sibling()[0]) + self.ps.stem('-'.join(n.leaves())) + '_cp_arg1'
features.append(f)
return features
from nltk.tokenize import sent_tokenize
import pandas as pd
import re
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from anytree import PostOrderIter
from functools import reduce
from matplotlib import pyplot
from scipy.stats import pearsonr
from sklearn.metrics import mean_absolute_error
import pickle
from server.agent.review_tokenizer import tokenize_review, reduce_noise
from server.agent.argument import *
reviewables = [camera, image, video, battery, flash, audio, price, shipping, lens, zoom, af]
features = [image, video, battery, flash, audio, price, shipping, lens, zoom, af]
glossary = {
camera: ['camera', 'device', 'product'],
image: ['image', 'picture', ' pic '],
video: ['video'],
battery: ['battery'],
flash: ['flash'],
audio: ['audio', 'sound'],
price: ['price', 'value', 'cost'],
shipping: ['ship']
}
sentiment_threshold = 0.95
f = open('camera_review_classifier.pickle', 'rb')
classifier = pickle.load(f)
f.close()
# extract phrases
def extract_phrases(review_body):
sentences = sent_tokenize(review_body)
phrases = []
for sentence in sentences:
phrases += re.split(' but | although | though | otherwise | however | unless | whereas | despite |<br />', sentence)
return phrases
# analyze sentiment
analyzer = SentimentIntensityAnalyzer()
def get_sentiment(phrase):
# get vader score
# vader_s = analyzer.polarity_scores(phrase)
# compound_s = vader_s['compound']
# get classification
tokens = reduce_noise(tokenize_review(phrase))
prob_classification = classifier.prob_classify(dict([token, True] for token in tokens))
# return compound_s if the two agree
# if (classification == '+' and compound_s > 0) or (classification == '-' and compound_s < 0):
# return compound_s
# else:
# return 0
classification = prob_classification.max()
strength = (prob_classification.prob(classification) - 0.5) * 2
return strength if classification == '+' else -strength
# remove all ancestors of node in list l
def remove_ancestors(node, l):
if node.parent != None:
try:
l.remove(node.parent)
except ValueError:
pass
remove_ancestors(node.parent, l)
# get reviewable(s) that match phrase
def get_reviewables(phrase):
reviewable_matches = []
reviewables = [node for node in PostOrderIter(camera)]
while len(reviewables) > 0:
f = reviewables.pop(0)
for word in glossary[f]:
if word in phrase:
reviewable_matches.append(f)
remove_ancestors(f, reviewables)
break
return reviewable_matches
def extract_votes(phrases):
votes = {}
for phrase in phrases:
reviewables = get_reviewables(phrase)
sentiment = get_sentiment(phrase)
if abs(sentiment) > sentiment_threshold:
for reviewable in reviewables:
if (reviewable not in votes) or (abs(votes[reviewable]) < abs(sentiment)):
votes[reviewable] = sentiment # what if there's two phrases with same reviewable?
# normalize votes to 1 (+) or -1 (-)
for reviewable in votes:
votes[reviewable] = 1 if votes[reviewable] > 0 else -1
return votes
# augment votes (Definition 4.3) obtained for a single critic
def augment_votes(votes):
reviewables = [node for node in PostOrderIter(camera)]
for reviewable in reviewables:
if reviewable not in votes:
polar_sum = 0
for subfeat in reviewable.children:
if subfeat in votes:
polar_sum += votes[subfeat]
if polar_sum != 0:
votes[reviewable] = 1 if polar_sum > 0 else -1
def get_qbaf(ra, review_count):
# sums of all positive and negative votes for reviewables
reviewable_sums = {}
for reviewable in reviewables:
reviewable_sums[reviewable] = 0
for r in ra:
if r['reviewable'] == reviewable:
reviewable_sums[reviewable] += r['vote']
# calculate attack/support relations for camera
supporters = {r: [] for r in reviewables}
attackers = {r: [] for r in reviewables}
for r in reviewables:
for subf in r.children:
if reviewable_sums[subf] > 0:
supporters[r].append(subf)
elif reviewable_sums[subf] < 0:
attackers[r].append(subf)
# calculate base scores for reviewables
base_scores = {}
base_scores[camera] = 0.5 + 0.5 * reviewable_sums[camera] / review_count
for feature in features:
base_scores[feature] = abs(reviewable_sums[feature]) / review_count
qbaf = {"supporters": supporters, "attackers": attackers, "base_scores": base_scores}
return qbaf
def combined_strength(args):
if len(args) != 0:
return 1 - reduce(lambda x, y: x * y, map(lambda v: 1 - v, args))
return 0
def argument_strength(base_score, attacker_strengths, supporter_strengths):
attack = combined_strength(attacker_strengths)
support = combined_strength(supporter_strengths)
if attack > support:
return base_score - (base_score * abs(attack - support))
elif attack < support:
return base_score + ((1 - base_score) * abs(attack - support))
return base_score
# apply DF-QUAD gradual semantics to qbaf
def get_strengths(qbaf):
strengths = {}
reviewables = [node for node in PostOrderIter(camera)]
for reviewable in reviewables:
attacker_strengths = []
supporter_strengths = []
for child in reviewable.children:
if child in qbaf["attackers"][reviewable]:
attacker_strengths.append(strengths[child])
elif child in qbaf["supporters"][reviewable]:
supporter_strengths.append(strengths[child])
strengths[reviewable] = argument_strength(qbaf["base_scores"][reviewable], attacker_strengths, supporter_strengths)
return strengths
#############
all_reviews = pd.read_csv('target_extraction/data/camera_prepared_data.tsv', sep='\t', error_bad_lines=False)
camera_strengths = []
star_rating_averages = []
products_analyzed = 0
grouped = all_reviews.groupby('product_id')
for product_id, reviews in grouped:
# get ra
ra = []
voting_reviews = 0
review_count = 0
star_rating_sum = 0
for _, review in reviews.iterrows():
review_id = review['review_id']
review_count += 1
star_rating_sum += review['star_rating']
phrases = extract_phrases(review['review_body'])
votes = extract_votes(phrases)