File size: 2,907 Bytes
7288748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Dict, List
from pathlib import Path
from sqlite3 import Cursor

from utils import accepts_types, create_videos
from preprocessing.youtubevideopreprocessor import YoutubeVideoPreprocessor
from loading.loaderiterator import LoaderIterator
from transforming.batchtransformer import BatchTransformer
from storing.sqlitebatchvideostorer import SQLiteBatchVideoStorer
from storing.sqlitecontextmanager import SQLiteContextManager
from loading.serialization import JsonSerializer
from transforming.addtitletransform import AddTitleTransform
from transforming.adddescriptiontransform import AddDescriptionTransform
from transforming.whispertransform import WhisperTransform

class DataPipeline:
    """A class that wraps the different components of the system. It processes
    data using these steps: load -> apply transform -> store.
    """
    
    def __init__(self,
                 loader_iterator: LoaderIterator,
                 batch_transformer: BatchTransformer,
                 storer: SQLiteBatchVideoStorer,
                 sqlite_context_manager: SQLiteContextManager) -> None:
        self.loader_iterator = loader_iterator
        self.batch_transformer = batch_transformer
        self.storer = storer
        self.sqlite_context_manager = sqlite_context_manager
        
    @accepts_types(list)
    def process(self, load_paths: List[Path]) -> None:
        """Process files in batches: load -> transform -> store to db."""
        self.loader_iterator.load_paths = load_paths
        with self.sqlite_context_manager as db_cursor:
            for video_data_batch in self.loader_iterator:
                self._process_video_batch(db_cursor, video_data_batch)
    
    def _process_video_batch(self,
                             db_cursor: Cursor,
                             video_data_batch: List[Dict]) -> None:
        videos = create_videos(video_data_batch)
        transformed_videos = self.batch_transformer.apply(videos)
        self.storer.store(db_cursor, transformed_videos)

def create_hardcoded_data_pipeline(db_path, whisper_model: str="base") -> DataPipeline:
    """Factory function to create a DataPipeline with 
    default arguments. 
    TODO: Create DataPipeline so users can pass the args.
    """
    loader_iterator = LoaderIterator(JsonSerializer(), 2)
    # Whisper transform using based model and timestamps
    # TODO: Let user select this parameters.
    batch_transformer = BatchTransformer([AddTitleTransform(),
                                          AddDescriptionTransform(),
                                          WhisperTransform(model=whisper_model)])
    video_storer = SQLiteBatchVideoStorer()
    sqlite_context_manager = SQLiteContextManager(db_path)
    return DataPipeline(loader_iterator,
                        batch_transformer,
                        video_storer,
                        sqlite_context_manager)