import spacy from spacy.training import Example from spacy.util import minibatch, compounding from pathlib import Path from spacy.tokens import DocBin import random # Load the training data from the .spacy file def load_data_from_spacy_file(file_path): # Initialize a blank English model to ensure compatibility nlp = spacy.blank("en") # Load the DocBin object and get documents try: doc_bin = DocBin().from_disk(file_path) docs = list(doc_bin.get_docs(nlp.vocab)) return docs except Exception as e: print(f"Error loading data from .spacy file: {e}") return [] # Train model function def train_model(epochs, model_path): # Initialize a blank English model nlp = spacy.blank("en") # Create an NER component and add it to the pipeline if "ner" not in nlp.pipe_names: ner = nlp.add_pipe("ner") nlp.add_pipe("sentencizer") # Define all possible entity labels labels = [ "PERSON", "CONTACT", "EMAIL", "ABOUT", "EXPERIENCE", "YEARS_EXPERIENCE", "UNIVERSITY", "SOFT_SKILL", "INSTITUTE", "LAST_QUALIFICATION_YEAR", "JOB_TITLE", "COMPANY", "COURSE", "DOB", "HOBBIES", "LINK", "SCHOOL", "QUALIFICATION", "LANGUAGE", "LOCATION", "PROJECTS", "SKILL", "CERTIFICATE" ] # Add labels to the NER component for label in labels: ner.add_label(label) # Load the training data train_data = load_data_from_spacy_file("./data/Spacy_data.spacy") # Start the training optimizer = nlp.begin_training() epoch_losses = [] best_loss = float('inf') # Training loop for epoch in range(epochs): losses = {} random.shuffle(train_data) # Shuffle data for better training # Create minibatches batches = minibatch(train_data, size=compounding(4.0, 32.0, 1.001)) for batch in batches: texts, annotations = zip(*[(doc.text, {"entities": [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]}) for doc in batch]) # Convert to Example objects examples = [Example.from_dict(nlp.make_doc(text), annotation) for text, annotation in zip(texts, annotations)] # Update the model nlp.update(examples, sgd=optimizer, drop=0.35, losses=losses) current_loss = losses.get("ner", float('inf')) epoch_losses.append(current_loss) print(f"Losses at epoch {epoch + 1}: {losses}") # Stop training if the loss is zero if current_loss == 0: break # Save the best model if current_loss < best_loss: best_loss = current_loss nlp.to_disk(model_path) # Save the final model nlp.to_disk(model_path) return epoch_losses