Spaces:
Running
Running
File size: 47,636 Bytes
ffe5eb2 e1c1f68 ffe5eb2 b4510a6 ffe5eb2 e1c1f68 ffe5eb2 e1c1f68 ffe5eb2 b4510a6 55f0ce3 ffe5eb2 e1c1f68 ffe5eb2 5d87c3c ffe5eb2 55f0ce3 ffe5eb2 55f0ce3 ffe5eb2 55f0ce3 ffe5eb2 e1c1f68 b27bab2 ffe5eb2 b27bab2 ffe5eb2 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 e1c1f68 b4510a6 b27bab2 b4510a6 b27bab2 b4510a6 b27bab2 b4510a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 |
import numpy as np
import pandas as pd
import gradio as gr
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from bertopic._utils import check_documents_type, validate_distance_matrix
from bertopic.plotting._hierarchy import _get_annotations
import plotly.figure_factory as ff
from packaging import version
import math
from umap import UMAP
from typing import List, Union, Callable
from scipy.sparse import csr_matrix
from scipy.cluster import hierarchy as sch
from sklearn.metrics.pairwise import cosine_similarity
from sklearn import __version__ as sklearn_version
from tqdm import tqdm
import itertools
import numpy as np
# Following adapted from Bertopic original implementation here (Maarten Grootendorst): https://github.com/MaartenGr/BERTopic/blob/master/bertopic/plotting/_documents.py
def visualize_documents_custom(topic_model,
docs: List[str],
hover_labels: List[str],
topics: List[int] = None,
embeddings: np.ndarray = None,
reduced_embeddings: np.ndarray = None,
sample: float = None,
hide_annotations: bool = False,
hide_document_hover: bool = False,
custom_labels: Union[bool, str] = False,
title: str = "<b>Documents and Topics</b>",
width: int = 1200,
height: int = 750, progress=gr.Progress(track_tqdm=True)):
""" Visualize documents and their topics in 2D
Arguments:
topic_model: A fitted BERTopic instance.
docs: The documents you used when calling either `fit` or `fit_transform`
topics: A selection of topics to visualize.
Not to be confused with the topics that you get from `.fit_transform`.
For example, if you want to visualize only topics 1 through 5:
`topics = [1, 2, 3, 4, 5]`.
embeddings: The embeddings of all documents in `docs`.
reduced_embeddings: The 2D reduced embeddings of all documents in `docs`.
sample: The percentage of documents in each topic that you would like to keep.
Value can be between 0 and 1. Setting this value to, for example,
0.1 (10% of documents in each topic) makes it easier to visualize
millions of documents as a subset is chosen.
hide_annotations: Hide the names of the traces on top of each cluster.
hide_document_hover: Hide the content of the documents when hovering over
specific points. Helps to speed up generation of visualization.
custom_labels: If bool, whether to use custom topic labels that were defined using
`topic_model.set_topic_labels`.
If `str`, it uses labels from other aspects, e.g., "Aspect1".
title: Title of the plot.
width: The width of the figure.
height: The height of the figure.
Examples:
To visualize the topics simply run:
```python
topic_model.visualize_documents(docs)
```
Do note that this re-calculates the embeddings and reduces them to 2D.
The advised and prefered pipeline for using this function is as follows:
```python
from sklearn.datasets import fetch_20newsgroups
from sentence_transformers import SentenceTransformer
from bertopic import BERTopic
from umap import UMAP
# Prepare embeddings
docs = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))['data']
sentence_model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = sentence_model.encode(docs, show_progress_bar=False)
# Train BERTopic
topic_model = BERTopic().fit(docs, embeddings)
# Reduce dimensionality of embeddings, this step is optional
# reduced_embeddings = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit_transform(embeddings)
# Run the visualization with the original embeddings
topic_model.visualize_documents(docs, embeddings=embeddings)
# Or, if you have reduced the original embeddings already:
topic_model.visualize_documents(docs, reduced_embeddings=reduced_embeddings)
```
Or if you want to save the resulting figure:
```python
fig = topic_model.visualize_documents(docs, reduced_embeddings=reduced_embeddings)
fig.write_html("path/to/file.html")
```
<iframe src="../../getting_started/visualization/documents.html"
style="width:1000px; height: 800px; border: 0px;""></iframe>
"""
topic_per_doc = topic_model.topics_
# Add <br> tags to hover labels to get them to appear on multiple lines
def wrap_by_word(s, n):
'''returns a string up to 300 words where \\n is inserted between every n words'''
a = s.split()[:300]
ret = ''
for i in range(0, len(a), n):
ret += ' '.join(a[i:i+n]) + '<br>'
return ret
# Apply the function to every element in the list
hover_labels = [wrap_by_word(s, n=20) for s in hover_labels]
# Sample the data to optimize for visualization and dimensionality reduction
if sample is None or sample > 1:
sample = 1
indices = []
for topic in set(topic_per_doc):
s = np.where(np.array(topic_per_doc) == topic)[0]
size = len(s) if len(s) < 100 else int(len(s) * sample)
indices.extend(np.random.choice(s, size=size, replace=False))
indices = np.array(indices)
df = pd.DataFrame({"topic": np.array(topic_per_doc)[indices]})
df["doc"] = [docs[index] for index in indices]
df["hover_labels"] = [hover_labels[index] for index in indices]
df["topic"] = [topic_per_doc[index] for index in indices]
# Extract embeddings if not already done
if sample is None:
if embeddings is None and reduced_embeddings is None:
embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document")
else:
embeddings_to_reduce = embeddings
else:
if embeddings is not None:
embeddings_to_reduce = embeddings[indices]
elif embeddings is None and reduced_embeddings is None:
embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document")
# Reduce input embeddings
if reduced_embeddings is None:
umap_model = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit(embeddings_to_reduce)
embeddings_2d = umap_model.embedding_
elif sample is not None and reduced_embeddings is not None:
embeddings_2d = reduced_embeddings[indices]
elif sample is None and reduced_embeddings is not None:
embeddings_2d = reduced_embeddings
unique_topics = set(topic_per_doc)
if topics is None:
topics = unique_topics
# Combine data
df["x"] = embeddings_2d[:, 0]
df["y"] = embeddings_2d[:, 1]
# Prepare text and names
trace_name_char_length = 60
if isinstance(custom_labels, str):
names = [[[str(topic), None]] + topic_model.topic_aspects_[custom_labels][topic] for topic in unique_topics]
names = ["_".join([label[0] for label in labels[:4]]) for labels in names]
names = [label if len(label) < 30 else label[:27] + "..." for label in names]
elif topic_model.custom_labels_ is not None and custom_labels:
#print("Using custom labels: ", topic_model.custom_labels_)
#names = [topic_model.custom_labels_[topic + topic_model._outliers] for topic in unique_topics]
# Limit label length to 100 chars
names = [label[:trace_name_char_length] for label in (topic_model.custom_labels_[topic + topic_model._outliers] for topic in unique_topics)]
else:
#print("Not using custom labels")
# Limit label length to 100 chars
names = [f"{topic} " + ", ".join([word for word, value in topic_model.get_topic(topic)][:3])[:trace_name_char_length] for topic in unique_topics]
#names = [f"{topic} " + ", ".join([word for word, value in topic_model.get_topic(topic)][:3]) for topic in unique_topics]
#print(names)
# Visualize
fig = go.Figure()
# Outliers and non-selected topics
non_selected_topics = set(unique_topics).difference(topics)
if len(non_selected_topics) == 0:
non_selected_topics = [-1]
selection = df.loc[df.topic.isin(non_selected_topics), :]
selection["text"] = ""
selection.loc[len(selection), :] = [None, None, None, selection.x.mean(), selection.y.mean(), "Other documents"]
fig.add_trace(
go.Scattergl(
x=selection.x,
y=selection.y,
hovertext=selection.hover_labels if not hide_document_hover else None,
hoverinfo="text",
mode='markers+text',
name="other",
showlegend=False,
marker=dict(color='#CFD8DC', size=5, opacity=0.5),
hoverlabel=dict(align='left')
)
)
# Selected topics
for name, topic in zip(names, unique_topics):
#print(name)
#print(topic)
if topic in topics and topic != -1:
selection = df.loc[df.topic == topic, :]
selection["text"] = ""
if not hide_annotations:
selection.loc[len(selection), :] = [None, None, selection.x.mean(), selection.y.mean(), name]
fig.add_trace(
go.Scattergl(
x=selection.x,
y=selection.y,
hovertext=selection.hover_labels if not hide_document_hover else None,
hoverinfo="text",
text=selection.text,
mode='markers+text',
name=name,
textfont=dict(
size=12,
),
marker=dict(size=5, opacity=0.5),
hoverlabel=dict(align='left')
))
# Add grid in a 'plus' shape
x_range = (df.x.min() - abs((df.x.min()) * .15), df.x.max() + abs((df.x.max()) * .15))
y_range = (df.y.min() - abs((df.y.min()) * .15), df.y.max() + abs((df.y.max()) * .15))
fig.add_shape(type="line",
x0=sum(x_range) / 2, y0=y_range[0], x1=sum(x_range) / 2, y1=y_range[1],
line=dict(color="#CFD8DC", width=2))
fig.add_shape(type="line",
x0=x_range[0], y0=sum(y_range) / 2, x1=x_range[1], y1=sum(y_range) / 2,
line=dict(color="#9E9E9E", width=2))
fig.add_annotation(x=x_range[0], y=sum(y_range) / 2, text="D1", showarrow=False, yshift=10)
fig.add_annotation(y=y_range[1], x=sum(x_range) / 2, text="D2", showarrow=False, xshift=10)
# Stylize layout
fig.update_layout(
template="simple_white",
title={
'text': f"{title}",
'x': 0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(
size=22,
color="Black")
},
hoverlabel_align = 'left',
width=width,
height=height
)
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
return fig
def hierarchical_topics_custom(self,
docs: List[str],
linkage_function: Callable[[csr_matrix], np.ndarray] = None,
distance_function: Callable[[csr_matrix], csr_matrix] = None, progress=gr.Progress(track_tqdm=True)) -> pd.DataFrame:
""" Create a hierarchy of topics
To create this hierarchy, BERTopic needs to be already fitted once.
Then, a hierarchy is calculated on the distance matrix of the c-TF-IDF
representation using `scipy.cluster.hierarchy.linkage`.
Based on that hierarchy, we calculate the topic representation at each
merged step. This is a local representation, as we only assume that the
chosen step is merged and not all others which typically improves the
topic representation.
Arguments:
docs: The documents you used when calling either `fit` or `fit_transform`
linkage_function: The linkage function to use. Default is:
`lambda x: sch.linkage(x, 'ward', optimal_ordering=True)`
distance_function: The distance function to use on the c-TF-IDF matrix. Default is:
`lambda x: 1 - cosine_similarity(x)`.
You can pass any function that returns either a square matrix of
shape (n_samples, n_samples) with zeros on the diagonal and
non-negative values or condensed distance matrix of shape
(n_samples * (n_samples - 1) / 2,) containing the upper
triangular of the distance matrix.
Returns:
hierarchical_topics: A dataframe that contains a hierarchy of topics
represented by their parents and their children
Examples:
```python
from bertopic import BERTopic
topic_model = BERTopic()
topics, probs = topic_model.fit_transform(docs)
hierarchical_topics = topic_model.hierarchical_topics(docs)
```
A custom linkage function can be used as follows:
```python
from scipy.cluster import hierarchy as sch
from bertopic import BERTopic
topic_model = BERTopic()
topics, probs = topic_model.fit_transform(docs)
# Hierarchical topics
linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True)
hierarchical_topics = topic_model.hierarchical_topics(docs, linkage_function=linkage_function)
```
"""
check_documents_type(docs)
if distance_function is None:
distance_function = lambda x: 1 - cosine_similarity(x)
if linkage_function is None:
linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True)
# Calculate distance
embeddings = self.c_tf_idf_[self._outliers:]
X = distance_function(embeddings)
X = validate_distance_matrix(X, embeddings.shape[0])
# Use the 1-D condensed distance matrix as an input instead of the raw distance matrix
Z = linkage_function(X)
# Calculate basic bag-of-words to be iteratively merged later
documents = pd.DataFrame({"Document": docs,
"ID": range(len(docs)),
"Topic": self.topics_})
documents_per_topic = documents.groupby(['Topic'], as_index=False).agg({'Document': ' '.join})
documents_per_topic = documents_per_topic.loc[documents_per_topic.Topic != -1, :]
clean_documents = self._preprocess_text(documents_per_topic.Document.values)
# Scikit-Learn Deprecation: get_feature_names is deprecated in 1.0
# and will be removed in 1.2. Please use get_feature_names_out instead.
if version.parse(sklearn_version) >= version.parse("1.0.0"):
words = self.vectorizer_model.get_feature_names_out()
else:
words = self.vectorizer_model.get_feature_names()
bow = self.vectorizer_model.transform(clean_documents)
# Extract clusters
hier_topics = pd.DataFrame(columns=["Parent_ID", "Parent_Name", "Topics",
"Child_Left_ID", "Child_Left_Name",
"Child_Right_ID", "Child_Right_Name"])
for index in tqdm(range(len(Z))):
# Find clustered documents
clusters = sch.fcluster(Z, t=Z[index][2], criterion='distance') - self._outliers
nr_clusters = len(clusters)
# Extract first topic we find to get the set of topics in a merged topic
topic = None
val = Z[index][0]
while topic is None:
if val - len(clusters) < 0:
topic = int(val)
else:
val = Z[int(val - len(clusters))][0]
clustered_topics = [i for i, x in enumerate(clusters) if x == clusters[topic]]
# Group bow per cluster, calculate c-TF-IDF and extract words
grouped = csr_matrix(bow[clustered_topics].sum(axis=0))
c_tf_idf = self.ctfidf_model.transform(grouped)
selection = documents.loc[documents.Topic.isin(clustered_topics), :]
selection.Topic = 0
words_per_topic = self._extract_words_per_topic(words, selection, c_tf_idf, calculate_aspects=False)
# Extract parent's name and ID
parent_id = index + len(clusters)
parent_name = ", ".join([x[0] for x in words_per_topic[0]][:5])
# Extract child's name and ID
Z_id = Z[index][0]
child_left_id = Z_id if Z_id - nr_clusters < 0 else Z_id - nr_clusters
if Z_id - nr_clusters < 0:
child_left_name = ", ".join([x[0] for x in self.get_topic(Z_id)][:5])
else:
child_left_name = hier_topics.iloc[int(child_left_id)].Parent_Name
# Extract child's name and ID
Z_id = Z[index][1]
child_right_id = Z_id if Z_id - nr_clusters < 0 else Z_id - nr_clusters
if Z_id - nr_clusters < 0:
child_right_name = ", ".join([x[0] for x in self.get_topic(Z_id)][:5])
else:
child_right_name = hier_topics.iloc[int(child_right_id)].Parent_Name
# Save results
hier_topics.loc[len(hier_topics), :] = [parent_id, parent_name,
clustered_topics,
int(Z[index][0]), child_left_name,
int(Z[index][1]), child_right_name]
hier_topics["Distance"] = Z[:, 2]
hier_topics = hier_topics.sort_values("Parent_ID", ascending=False)
hier_topics[["Parent_ID", "Child_Left_ID", "Child_Right_ID"]] = hier_topics[["Parent_ID", "Child_Left_ID", "Child_Right_ID"]].astype(str)
return hier_topics
def visualize_hierarchy_custom(topic_model,
orientation: str = "left",
topics: List[int] = None,
top_n_topics: int = None,
custom_labels: Union[bool, str] = False,
title: str = "<b>Hierarchical Clustering</b>",
width: int = 1000,
height: int = 600,
hierarchical_topics: pd.DataFrame = None,
linkage_function: Callable[[csr_matrix], np.ndarray] = None,
distance_function: Callable[[csr_matrix], csr_matrix] = None,
color_threshold: int = 1) -> go.Figure:
""" Visualize a hierarchical structure of the topics
A ward linkage function is used to perform the
hierarchical clustering based on the cosine distance
matrix between topic embeddings.
Arguments:
topic_model: A fitted BERTopic instance.
orientation: The orientation of the figure.
Either 'left' or 'bottom'
topics: A selection of topics to visualize
top_n_topics: Only select the top n most frequent topics
custom_labels: If bool, whether to use custom topic labels that were defined using
`topic_model.set_topic_labels`.
If `str`, it uses labels from other aspects, e.g., "Aspect1".
NOTE: Custom labels are only generated for the original
un-merged topics.
title: Title of the plot.
width: The width of the figure. Only works if orientation is set to 'left'
height: The height of the figure. Only works if orientation is set to 'bottom'
hierarchical_topics: A dataframe that contains a hierarchy of topics
represented by their parents and their children.
NOTE: The hierarchical topic names are only visualized
if both `topics` and `top_n_topics` are not set.
linkage_function: The linkage function to use. Default is:
`lambda x: sch.linkage(x, 'ward', optimal_ordering=True)`
NOTE: Make sure to use the same `linkage_function` as used
in `topic_model.hierarchical_topics`.
distance_function: The distance function to use on the c-TF-IDF matrix. Default is:
`lambda x: 1 - cosine_similarity(x)`.
You can pass any function that returns either a square matrix of
shape (n_samples, n_samples) with zeros on the diagonal and
non-negative values or condensed distance matrix of shape
(n_samples * (n_samples - 1) / 2,) containing the upper
triangular of the distance matrix.
NOTE: Make sure to use the same `distance_function` as used
in `topic_model.hierarchical_topics`.
color_threshold: Value at which the separation of clusters will be made which
will result in different colors for different clusters.
A higher value will typically lead in less colored clusters.
Returns:
fig: A plotly figure
Examples:
To visualize the hierarchical structure of
topics simply run:
```python
topic_model.visualize_hierarchy()
```
If you also want the labels visualized of hierarchical topics,
run the following:
```python
# Extract hierarchical topics and their representations
hierarchical_topics = topic_model.hierarchical_topics(docs)
# Visualize these representations
topic_model.visualize_hierarchy(hierarchical_topics=hierarchical_topics)
```
If you want to save the resulting figure:
```python
fig = topic_model.visualize_hierarchy()
fig.write_html("path/to/file.html")
```
<iframe src="../../getting_started/visualization/hierarchy.html"
style="width:1000px; height: 680px; border: 0px;""></iframe>
"""
if distance_function is None:
distance_function = lambda x: 1 - cosine_similarity(x)
if linkage_function is None:
linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True)
# Select topics based on top_n and topics args
freq_df = topic_model.get_topic_freq()
freq_df = freq_df.loc[freq_df.Topic != -1, :]
if topics is not None:
topics = list(topics)
elif top_n_topics is not None:
topics = sorted(freq_df.Topic.to_list()[:top_n_topics])
else:
topics = sorted(freq_df.Topic.to_list())
# Select embeddings
all_topics = sorted(list(topic_model.get_topics().keys()))
indices = np.array([all_topics.index(topic) for topic in topics])
# Select topic embeddings
if topic_model.c_tf_idf_ is not None:
embeddings = topic_model.c_tf_idf_[indices]
else:
embeddings = np.array(topic_model.topic_embeddings_)[indices]
# Annotations
if hierarchical_topics is not None and len(topics) == len(freq_df.Topic.to_list()):
annotations = _get_annotations(topic_model=topic_model,
hierarchical_topics=hierarchical_topics,
embeddings=embeddings,
distance_function=distance_function,
linkage_function=linkage_function,
orientation=orientation,
custom_labels=custom_labels)
else:
annotations = None
# wrap distance function to validate input and return a condensed distance matrix
distance_function_viz = lambda x: validate_distance_matrix(
distance_function(x), embeddings.shape[0])
# Create dendogram
fig = ff.create_dendrogram(embeddings,
orientation=orientation,
distfun=distance_function_viz,
linkagefun=linkage_function,
hovertext=annotations,
color_threshold=color_threshold)
# Create nicer labels
axis = "yaxis" if orientation == "left" else "xaxis"
if isinstance(custom_labels, str):
new_labels = [[[str(x), None]] + topic_model.topic_aspects_[custom_labels][x] for x in fig.layout[axis]["ticktext"]]
new_labels = [", ".join([label[0] for label in labels[:4]]) for labels in new_labels]
new_labels = [label if len(label) < 30 else label[:27] + "..." for label in new_labels]
elif topic_model.custom_labels_ is not None and custom_labels:
new_labels = [topic_model.custom_labels_[topics[int(x)] + topic_model._outliers] for x in fig.layout[axis]["ticktext"]]
else:
new_labels = [[[str(topics[int(x)]), None]] + topic_model.get_topic(topics[int(x)])
for x in fig.layout[axis]["ticktext"]]
new_labels = [", ".join([label[0] for label in labels[:4]]) for labels in new_labels]
new_labels = [label if len(label) < 30 else label[:27] + "..." for label in new_labels]
# Stylize layout
fig.update_layout(
plot_bgcolor='#ECEFF1',
template="plotly_white",
title={
'text': f"{title}",
'x': 0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(
size=22,
color="Black")
},
hoverlabel=dict(
bgcolor="white",
font_size=16,
font_family="Rockwell"
),
)
# Stylize orientation
if orientation == "left":
fig.update_layout(height=200 + (15 * len(topics)),
width=width,
yaxis=dict(tickmode="array",
ticktext=new_labels))
# Fix empty space on the bottom of the graph
y_max = max([trace['y'].max() + 5 for trace in fig['data']])
y_min = min([trace['y'].min() - 5 for trace in fig['data']])
fig.update_layout(yaxis=dict(range=[y_min, y_max]))
else:
fig.update_layout(width=200 + (15 * len(topics)),
height=height,
xaxis=dict(tickmode="array",
ticktext=new_labels))
if hierarchical_topics is not None:
for index in [0, 3]:
axis = "x" if orientation == "left" else "y"
xs = [data["x"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)]
ys = [data["y"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)]
hovertext = [data["text"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)]
fig.add_trace(go.Scatter(x=xs, y=ys, marker_color='black',
hovertext=hovertext, hoverinfo="text",
mode='markers', showlegend=False))
return fig
def visualize_hierarchical_documents_custom(topic_model,
docs: List[str],
hover_labels: List[str],
hierarchical_topics: pd.DataFrame,
topics: List[int] = None,
embeddings: np.ndarray = None,
reduced_embeddings: np.ndarray = None,
sample: Union[float, int] = None,
hide_annotations: bool = False,
hide_document_hover: bool = True,
nr_levels: int = 10,
level_scale: str = 'linear',
custom_labels: Union[bool, str] = False,
title: str = "<b>Hierarchical Documents and Topics</b>",
width: int = 1200,
height: int = 750, progress=gr.Progress(track_tqdm=True)) -> go.Figure:
""" Visualize documents and their topics in 2D at different levels of hierarchy
Arguments:
docs: The documents you used when calling either `fit` or `fit_transform`
hierarchical_topics: A dataframe that contains a hierarchy of topics
represented by their parents and their children
topics: A selection of topics to visualize.
Not to be confused with the topics that you get from `.fit_transform`.
For example, if you want to visualize only topics 1 through 5:
`topics = [1, 2, 3, 4, 5]`.
embeddings: The embeddings of all documents in `docs`.
reduced_embeddings: The 2D reduced embeddings of all documents in `docs`.
sample: The percentage of documents in each topic that you would like to keep.
Value can be between 0 and 1. Setting this value to, for example,
0.1 (10% of documents in each topic) makes it easier to visualize
millions of documents as a subset is chosen.
hide_annotations: Hide the names of the traces on top of each cluster.
hide_document_hover: Hide the content of the documents when hovering over
specific points. Helps to speed up generation of visualizations.
nr_levels: The number of levels to be visualized in the hierarchy. First, the distances
in `hierarchical_topics.Distance` are split in `nr_levels` lists of distances.
Then, for each list of distances, the merged topics are selected that have a
distance less or equal to the maximum distance of the selected list of distances.
NOTE: To get all possible merged steps, make sure that `nr_levels` is equal to
the length of `hierarchical_topics`.
level_scale: Whether to apply a linear or logarithmic (log) scale levels of the distance
vector. Linear scaling will perform an equal number of merges at each level
while logarithmic scaling will perform more mergers in earlier levels to
provide more resolution at higher levels (this can be used for when the number
of topics is large).
custom_labels: If bool, whether to use custom topic labels that were defined using
`topic_model.set_topic_labels`.
If `str`, it uses labels from other aspects, e.g., "Aspect1".
NOTE: Custom labels are only generated for the original
un-merged topics.
title: Title of the plot.
width: The width of the figure.
height: The height of the figure.
Examples:
To visualize the topics simply run:
```python
topic_model.visualize_hierarchical_documents(docs, hierarchical_topics)
```
Do note that this re-calculates the embeddings and reduces them to 2D.
The advised and prefered pipeline for using this function is as follows:
```python
from sklearn.datasets import fetch_20newsgroups
from sentence_transformers import SentenceTransformer
from bertopic import BERTopic
from umap import UMAP
# Prepare embeddings
docs = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))['data']
sentence_model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = sentence_model.encode(docs, show_progress_bar=False)
# Train BERTopic and extract hierarchical topics
topic_model = BERTopic().fit(docs, embeddings)
hierarchical_topics = topic_model.hierarchical_topics(docs)
# Reduce dimensionality of embeddings, this step is optional
# reduced_embeddings = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit_transform(embeddings)
# Run the visualization with the original embeddings
topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, embeddings=embeddings)
# Or, if you have reduced the original embeddings already:
topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, reduced_embeddings=reduced_embeddings)
```
Or if you want to save the resulting figure:
```python
fig = topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, reduced_embeddings=reduced_embeddings)
fig.write_html("path/to/file.html")
```
NOTE:
This visualization was inspired by the scatter plot representation of Doc2Map:
https://github.com/louisgeisler/Doc2Map
<iframe src="../../getting_started/visualization/hierarchical_documents.html"
style="width:1000px; height: 770px; border: 0px;""></iframe>
"""
topic_per_doc = topic_model.topics_
# Add <br> tags to hover labels to get them to appear on multiple lines
def wrap_by_word(s, n):
'''returns a string up to 300 words where \\n is inserted between every n words'''
a = s.split()[:300]
ret = ''
for i in range(0, len(a), n):
ret += ' '.join(a[i:i+n]) + '<br>'
return ret
# Apply the function to every element in the list
hover_labels = [wrap_by_word(s, n=20) for s in hover_labels]
# Sample the data to optimize for visualization and dimensionality reduction
if sample is None or sample > 1:
sample = 1
indices = []
for topic in set(topic_per_doc):
s = np.where(np.array(topic_per_doc) == topic)[0]
size = len(s) if len(s) < 100 else int(len(s)*sample)
indices.extend(np.random.choice(s, size=size, replace=False))
indices = np.array(indices)
df = pd.DataFrame({"topic": np.array(topic_per_doc)[indices]})
df["doc"] = [docs[index] for index in indices]
df["hover_labels"] = [hover_labels[index] for index in indices]
df["topic"] = [topic_per_doc[index] for index in indices]
# Extract embeddings if not already done
if sample is None:
if embeddings is None and reduced_embeddings is None:
embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document")
else:
embeddings_to_reduce = embeddings
else:
if embeddings is not None:
embeddings_to_reduce = embeddings[indices]
elif embeddings is None and reduced_embeddings is None:
embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document")
# Reduce input embeddings
if reduced_embeddings is None:
umap_model = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit(embeddings_to_reduce)
embeddings_2d = umap_model.embedding_
elif sample is not None and reduced_embeddings is not None:
embeddings_2d = reduced_embeddings[indices]
elif sample is None and reduced_embeddings is not None:
embeddings_2d = reduced_embeddings
# Combine data
df["x"] = embeddings_2d[:, 0]
df["y"] = embeddings_2d[:, 1]
# Create topic list for each level, levels are created by calculating the distance
distances = hierarchical_topics.Distance.to_list()
if level_scale == 'log' or level_scale == 'logarithmic':
log_indices = np.round(np.logspace(start=math.log(1,10), stop=math.log(len(distances)-1,10), num=nr_levels)).astype(int).tolist()
log_indices.reverse()
max_distances = [distances[i] for i in log_indices]
elif level_scale == 'lin' or level_scale == 'linear':
max_distances = [distances[indices[-1]] for indices in np.array_split(range(len(hierarchical_topics)), nr_levels)][::-1]
else:
raise ValueError("level_scale needs to be one of 'log' or 'linear'")
for index, max_distance in enumerate(max_distances):
# Get topics below `max_distance`
mapping = {topic: topic for topic in df.topic.unique()}
selection = hierarchical_topics.loc[hierarchical_topics.Distance <= max_distance, :]
selection.Parent_ID = selection.Parent_ID.astype(int)
selection = selection.sort_values("Parent_ID")
for row in selection.iterrows():
for topic in row[1].Topics:
mapping[topic] = row[1].Parent_ID
# Make sure the mappings are mapped 1:1
mappings = [True for _ in mapping]
while any(mappings):
for i, (key, value) in enumerate(mapping.items()):
if value in mapping.keys() and key != value:
mapping[key] = mapping[value]
else:
mappings[i] = False
# Create new column
df[f"level_{index+1}"] = df.topic.map(mapping)
df[f"level_{index+1}"] = df[f"level_{index+1}"].astype(int)
# Prepare topic names of original and merged topics
trace_names = []
topic_names = {}
trace_name_char_length = 60
for topic in range(hierarchical_topics.Parent_ID.astype(int).max()):
if topic < hierarchical_topics.Parent_ID.astype(int).min():
if topic_model.get_topic(topic):
if isinstance(custom_labels, str):
trace_name = f"{topic} " + ", ".join(list(zip(*topic_model.topic_aspects_[custom_labels][topic]))[0][:5])
elif topic_model.custom_labels_ is not None and custom_labels:
trace_name = topic_model.custom_labels_[topic + topic_model._outliers]
else:
trace_name = f"{topic} " + ", ".join([word[:20] for word, _ in topic_model.get_topic(topic)][:5])
topic_names[topic] = {"trace_name": trace_name[:trace_name_char_length], "plot_text": trace_name[:trace_name_char_length]}
trace_names.append(trace_name)
else:
trace_name = f"{topic} " + hierarchical_topics.loc[hierarchical_topics.Parent_ID == str(topic), "Parent_Name"].values[0]
plot_text = ", ".join([name[:20] for name in trace_name.split(" ")[:5]])
topic_names[topic] = {"trace_name": trace_name[:trace_name_char_length], "plot_text": plot_text[:trace_name_char_length]}
trace_names.append(trace_name)
# Prepare traces
all_traces = []
for level in range(len(max_distances)):
traces = []
# Outliers
if topic_model._outliers:
traces.append(
go.Scattergl(
x=df.loc[(df[f"level_{level+1}"] == -1), "x"],
y=df.loc[df[f"level_{level+1}"] == -1, "y"],
mode='markers+text',
name="other",
hoverinfo="text",
hovertext=df.loc[(df[f"level_{level+1}"] == -1), "hover_labels"] if not hide_document_hover else None,
showlegend=False,
marker=dict(color='#CFD8DC', size=5, opacity=0.5),
hoverlabel=dict(align='left')
)
)
# Selected topics
if topics:
selection = df.loc[(df.topic.isin(topics)), :]
unique_topics = sorted([int(topic) for topic in selection[f"level_{level+1}"].unique()])
else:
unique_topics = sorted([int(topic) for topic in df[f"level_{level+1}"].unique()])
for topic in unique_topics:
if topic != -1:
if topics:
selection = df.loc[(df[f"level_{level+1}"] == topic) &
(df.topic.isin(topics)), :]
else:
selection = df.loc[df[f"level_{level+1}"] == topic, :]
if not hide_annotations:
selection.loc[len(selection), :] = None
selection["text"] = ""
selection.loc[len(selection) - 1, "x"] = selection.x.mean()
selection.loc[len(selection) - 1, "y"] = selection.y.mean()
selection.loc[len(selection) - 1, "text"] = topic_names[int(topic)]["plot_text"]
traces.append(
go.Scattergl(
x=selection.x,
y=selection.y,
text=selection.text if not hide_annotations else None,
hovertext=selection.hover_labels if not hide_document_hover else None,
hoverinfo="text",
name=topic_names[int(topic)]["trace_name"],
mode='markers+text',
marker=dict(size=5, opacity=0.5),
hoverlabel=dict(align='left')
)
)
all_traces.append(traces)
# Track and count traces
nr_traces_per_set = [len(traces) for traces in all_traces]
trace_indices = [(0, nr_traces_per_set[0])]
for index, nr_traces in enumerate(nr_traces_per_set[1:]):
start = trace_indices[index][1]
end = nr_traces + start
trace_indices.append((start, end))
# Visualization
fig = go.Figure()
for traces in all_traces:
for trace in traces:
fig.add_trace(trace)
for index in range(len(fig.data)):
if index >= nr_traces_per_set[0]:
fig.data[index].visible = False
# Create and add slider
steps = []
for index, indices in enumerate(trace_indices):
step = dict(
method="update",
label=str(index),
args=[{"visible": [False] * len(fig.data)}]
)
for index in range(indices[1]-indices[0]):
step["args"][0]["visible"][index+indices[0]] = True
steps.append(step)
sliders = [dict(
currentvalue={"prefix": "Level: "},
pad={"t": 20},
steps=steps
)]
# Add grid in a 'plus' shape
x_range = (df.x.min() - abs((df.x.min()) * .15), df.x.max() + abs((df.x.max()) * .15))
y_range = (df.y.min() - abs((df.y.min()) * .15), df.y.max() + abs((df.y.max()) * .15))
fig.add_shape(type="line",
x0=sum(x_range) / 2, y0=y_range[0], x1=sum(x_range) / 2, y1=y_range[1],
line=dict(color="#CFD8DC", width=2))
fig.add_shape(type="line",
x0=x_range[0], y0=sum(y_range) / 2, x1=x_range[1], y1=sum(y_range) / 2,
line=dict(color="#9E9E9E", width=2))
fig.add_annotation(x=x_range[0], y=sum(y_range) / 2, text="D1", showarrow=False, yshift=10)
fig.add_annotation(y=y_range[1], x=sum(x_range) / 2, text="D2", showarrow=False, xshift=10)
# Stylize layout
fig.update_layout(
sliders=sliders,
template="simple_white",
title={
'text': f"{title}",
'x': 0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(
size=22,
color="Black")
},
width=width,
height=height,
)
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
hierarchy_topics_df = df.filter(regex=r'topic|^level').drop_duplicates(subset="topic")
topic_names = pd.DataFrame(topic_names).T
return fig, hierarchy_topics_df, topic_names
def visualize_barchart_custom(topic_model,
topics: List[int] = None,
top_n_topics: int = 8,
n_words: int = 5,
custom_labels: Union[bool, str] = False,
title: str = "<b>Topic Word Scores</b>",
width: int = 250,
height: int = 250, progress=gr.Progress(track_tqdm=True)) -> go.Figure:
""" Visualize a barchart of selected topics
Arguments:
topic_model: A fitted BERTopic instance.
topics: A selection of topics to visualize.
top_n_topics: Only select the top n most frequent topics.
n_words: Number of words to show in a topic
custom_labels: If bool, whether to use custom topic labels that were defined using
`topic_model.set_topic_labels`.
If `str`, it uses labels from other aspects, e.g., "Aspect1".
title: Title of the plot.
width: The width of each figure.
height: The height of each figure.
Returns:
fig: A plotly figure
Examples:
To visualize the barchart of selected topics
simply run:
```python
topic_model.visualize_barchart()
```
Or if you want to save the resulting figure:
```python
fig = topic_model.visualize_barchart()
fig.write_html("path/to/file.html")
```
<iframe src="../../getting_started/visualization/bar_chart.html"
style="width:1100px; height: 660px; border: 0px;""></iframe>
"""
colors = itertools.cycle(["#D55E00", "#0072B2", "#CC79A7", "#E69F00", "#56B4E9", "#009E73", "#F0E442"])
# Select topics based on top_n and topics args
freq_df = topic_model.get_topic_freq()
freq_df = freq_df.loc[freq_df.Topic != -1, :]
if topics is not None:
topics = list(topics)
elif top_n_topics is not None:
topics = sorted(freq_df.Topic.to_list()[:top_n_topics])
else:
topics = sorted(freq_df.Topic.to_list()[0:6])
# Initialize figure
if isinstance(custom_labels, str):
subplot_titles = [[[str(topic), None]] + topic_model.topic_aspects_[custom_labels][topic] for topic in topics]
subplot_titles = ["_".join([label[0] for label in labels[:4]]) for labels in subplot_titles]
subplot_titles = [label if len(label) < 30 else label[:27] + "..." for label in subplot_titles]
elif topic_model.custom_labels_ is not None and custom_labels:
subplot_titles = [topic_model.custom_labels_[topic + topic_model._outliers] for topic in topics]
else:
subplot_titles = [f"Topic {topic}" for topic in topics]
columns = 3
rows = int(np.ceil(len(topics) / columns))
fig = make_subplots(rows=rows,
cols=columns,
shared_xaxes=False,
horizontal_spacing=.1,
vertical_spacing=.4 / rows if rows > 1 else 0,
subplot_titles=subplot_titles)
# Add barchart for each topic
row = 1
column = 1
for topic in topics:
words = [word + " " for word, _ in topic_model.get_topic(topic)][:n_words][::-1]
scores = [score for _, score in topic_model.get_topic(topic)][:n_words][::-1]
fig.add_trace(
go.Bar(x=scores,
y=words,
orientation='h',
marker_color=next(colors)),
row=row, col=column)
if column == columns:
column = 1
row += 1
else:
column += 1
# Stylize graph
fig.update_layout(
template="plotly_white",
showlegend=False,
title={
'text': f"{title}",
'x': .5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(
size=14,
color="Black")
},
width=width*4,
height=height*rows if rows > 1 else height * 1.3,
hoverlabel=dict(
bgcolor="white",
font_size=14,
font_family="Rockwell"
),
)
fig.update_xaxes(showgrid=True)
fig.update_yaxes(showgrid=True)
return fig |