File size: 10,774 Bytes
97f04a9
4a4777a
97f04a9
81f1abc
cd0bfbf
737064b
97f04a9
 
737064b
97f04a9
 
c39cd97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328e22b
1e87762
4068aa2
c16d7a9
97f04a9
 
c227094
3d51065
 
c16d7a9
f3408d8
c227094
 
f3408d8
c227094
 
328e22b
82321d6
c16d7a9
97f04a9
 
 
f9962fa
c4dc4b2
 
 
 
 
 
 
 
78b647b
f9962fa
81f1abc
 
 
 
e276f3d
 
 
 
 
eab63b5
e276f3d
 
c3ecb1d
 
 
87faff4
c3ecb1d
87faff4
 
 
c3ecb1d
 
 
 
 
 
 
 
 
 
87faff4
c3ecb1d
 
87faff4
c3ecb1d
87faff4
 
 
c3ecb1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328e22b
1e87762
4068aa2
3fe716b
 
82321d6
c16d7a9
97f04a9
 
 
737064b
1e87762
4068aa2
737064b
 
 
 
c16d7a9
737064b
 
 
 
4a4777a
1e87762
4068aa2
4a4777a
 
 
 
c16d7a9
4a4777a
 
 
 
 
1e87762
4068aa2
4a4777a
 
 
 
 
c16d7a9
4a4777a
 
 
 
 
 
 
 
1e87762
4068aa2
4a4777a
 
 
 
 
 
c16d7a9
4a4777a
 
 
 
 
 
 
328e22b
c16d7a9
97f04a9
 
 
328e22b
c16d7a9
97f04a9
24c3a19
 
328e22b
24c3a19
328e22b
24c3a19
1e87762
4068aa2
3fe716b
 
 
82321d6
c16d7a9
24c3a19
 
 
f3408d8
 
24c3a19
 
 
328e22b
24c3a19
328e22b
24c3a19
1e87762
4068aa2
3fe716b
 
 
82321d6
c16d7a9
24c3a19
 
 
 
b2aa04f
 
724e315
b2aa04f
 
 
724e315
b2aa04f
 
 
cc30577
 
f9dad3c
 
 
cc30577
 
 
 
 
724e315
cc30577
 
 
328e22b
c16d7a9
25de36d
 
 
 
24c3a19
 
 
328e22b
c16d7a9
25de36d
 
 
 
24c3a19
 
 
328e22b
c16d7a9
25de36d
 
 
 
24c3a19
 
 
328e22b
c16d7a9
25de36d
 
 
 
24c3a19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# -*- coding: utf-8 -*-
import os
import random
from datetime import datetime
from unittest import mock
from unittest.mock import MagicMock

from pytube import request
from pytube import Stream, streams


@mock.patch("pytube.streams.request")
def test_stream_to_buffer(mock_request, cipher_signature):
    # Given
    stream_bytes = iter(
        [
            bytes(os.urandom(8 * 1024)),
            bytes(os.urandom(8 * 1024)),
            bytes(os.urandom(8 * 1024)),
        ]
    )
    mock_request.stream.return_value = stream_bytes
    buffer = MagicMock()
    # When
    cipher_signature.streams[0].stream_to_buffer(buffer)
    # Then
    assert buffer.write.call_count == 3


def test_filesize(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "6796391"}
    assert cipher_signature.streams[0].filesize == 6796391


def test_filesize_approx(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "6796391"}
    stream = cipher_signature.streams[0]

    assert stream.filesize_approx == 22350604
    stream.bitrate = None
    assert stream.filesize_approx == 6796391


def test_default_filename(cipher_signature):
    expected = "PSY - GANGNAM STYLE(강남스타일) MV.mp4"
    stream = cipher_signature.streams[0]
    assert stream.default_filename == expected


def test_title(cipher_signature):
    expected = "title"
    cipher_signature.player_config_args["title"] = expected
    assert cipher_signature.title == expected

    expected = "title2"
    del cipher_signature.player_config_args["title"]
    cipher_signature.player_response = {"videoDetails": {"title": expected}}
    assert cipher_signature.title == expected


def test_expiration(cipher_signature):
    assert cipher_signature.streams[0].expiration == datetime(2020, 1, 15, 21, 12, 5)


def test_caption_tracks(presigned_video):
    assert len(presigned_video.caption_tracks) == 13


def test_captions(presigned_video):
    assert len(presigned_video.captions) == 13


def test_description(cipher_signature):
    expected = (
        "PSY - ‘I LUV IT’ M/V @ https://youtu.be/Xvjnoagk6GU\n"
        "PSY - ‘New Face’ M/V @https://youtu.be/OwJPPaEyqhI\n\n"
        "PSY - 8TH ALBUM '4X2=8' on iTunes @\n"
        "https://smarturl.it/PSY_8thAlbum\n\n"
        "PSY - GANGNAM STYLE(강남스타일) on iTunes @ http://smarturl.it/PsyGangnam\n\n"
        "#PSY #싸이 #GANGNAMSTYLE #강남스타일\n\n"
        "More about PSY@\nhttp://www.youtube.com/officialpsy\n"
        "http://www.facebook.com/officialpsy\n"
        "http://twitter.com/psy_oppa\n"
        "https://www.instagram.com/42psy42\n"
        "http://iTunes.com/PSY\n"
        "http://sptfy.com/PSY\n"
        "http://weibo.com/psyoppa"
    )
    assert cipher_signature.description == expected

    cipher_signature.player_response = {}
    expected = (
        "PSY - ‘I LUV IT’ M/V @ https://youtu.be/Xvjnoagk6GU\n"
        "PSY - ‘New Face’ M/V @https://youtu.be/OwJPPaEyqhI\n"
        "PSY - 8TH ALBUM '4X2=8' on iTunes @\n"
        "https://smarturl.it/PSY_8thAlbum\n"
        "PSY - GANGNAM STYLE(강남스타일) on iTunes @ http://smarturl.it/PsyGangnam\n"
        "#PSY #싸이 #GANGNAMSTYLE #강남스타일\n"
        "More about PSY@\nhttp://www.youtube.com/officialpsy\n"
        "http://www.facebook.com/officialpsy\n"
        "http://twitter.com/psy_oppa\n"
        "https://www.instagram.com/42psy42\n"
        "http://iTunes.com/PSY\n"
        "http://sptfy.com/PSY\n"
        "http://weibo.com/psyoppa"
    )
    assert cipher_signature.description == expected


def test_rating(cipher_signature):
    assert cipher_signature.rating == 4.522203


def test_length(cipher_signature):
    assert cipher_signature.length == 252


def test_views(cipher_signature):
    assert cipher_signature.views == 3494704859


def test_download(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        stream.download()


def test_download_with_prefix(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
    streams.target_directory = MagicMock(return_value="/target")
    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        file_path = stream.download(filename_prefix="prefix")
        assert file_path == "/target/prefixPSY - GANGNAM STYLE(강남스타일) MV.mp4"


def test_download_with_filename(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
    streams.target_directory = MagicMock(return_value="/target")
    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        file_path = stream.download(filename="cool name bro")
        assert file_path == "/target/cool name bro.mp4"


def test_download_with_existing(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    streams.target_directory = MagicMock(return_value="/target")
    mocker.patch.object(os.path, "isfile")
    os.path.isfile.return_value = True
    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        mocker.patch.object(os.path, "getsize")
        os.path.getsize.return_value = stream.filesize
        file_path = stream.download()
        assert file_path == "/target/PSY - GANGNAM STYLE(강남스타일) MV.mp4"
        assert not request.stream.called


def test_download_with_existing_no_skip(cipher_signature, mocker):
    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
    streams.target_directory = MagicMock(return_value="/target")
    mocker.patch.object(os.path, "isfile")
    os.path.isfile.return_value = True
    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        mocker.patch.object(os.path, "getsize")
        os.path.getsize.return_value = stream.filesize
        file_path = stream.download(skip_existing=False)
        assert file_path == "/target/PSY - GANGNAM STYLE(강남스타일) MV.mp4"
        assert request.stream.called


def test_progressive_streams_return_includes_audio_track(cipher_signature):
    stream = cipher_signature.streams.filter(progressive=True)[0]
    assert stream.includes_audio_track


def test_progressive_streams_return_includes_video_track(cipher_signature):
    stream = cipher_signature.streams.filter(progressive=True)[0]
    assert stream.includes_video_track


def test_on_progress_hook(cipher_signature, mocker):
    callback_fn = mock.MagicMock()
    cipher_signature.register_on_progress_callback(callback_fn)

    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])

    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        stream.download()
    assert callback_fn.called
    args, _ = callback_fn.call_args
    assert len(args) == 3
    stream, _, _ = args
    assert isinstance(stream, Stream)


def test_on_complete_hook(cipher_signature, mocker):
    callback_fn = mock.MagicMock()
    cipher_signature.register_on_complete_callback(callback_fn)

    mocker.patch.object(request, "head")
    request.head.return_value = {"content-length": "16384"}
    mocker.patch.object(request, "stream")
    request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])

    with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
        stream = cipher_signature.streams[0]
        stream.download()
    assert callback_fn.called


def test_author(cipher_signature):
    expected = "Test author"
    cipher_signature.player_response = {"videoDetails": {"author": expected}}
    assert cipher_signature.author == expected

    expected = "unknown"
    cipher_signature.player_response = {}
    assert cipher_signature.author == expected


def test_thumbnail_when_in_details(cipher_signature):
    expected = "some url"
    cipher_signature.player_response = {
        "videoDetails": {"thumbnail": {"thumbnails": [{"url": expected}]}}
    }
    assert cipher_signature.thumbnail_url == expected


def test_thumbnail_when_not_in_details(cipher_signature):
    expected = "https://img.youtube.com/vi/9bZkp7q19f0/maxresdefault.jpg"
    cipher_signature.player_response = {}
    assert cipher_signature.thumbnail_url == expected


def test_repr_for_audio_streams(cipher_signature):
    stream = str(cipher_signature.streams.filter(only_audio=True)[0])
    expected = (
        '<Stream: itag="140" mime_type="audio/mp4" abr="128kbps" '
        'acodec="mp4a.40.2" progressive="False" type="audio">'
    )
    assert stream == expected


def test_repr_for_video_streams(cipher_signature):
    stream = str(cipher_signature.streams.filter(only_video=True)[0])
    expected = (
        '<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
        'vcodec="avc1.640028" progressive="False" type="video">'
    )
    assert stream == expected


def test_repr_for_progressive_streams(cipher_signature):
    stream = str(cipher_signature.streams.filter(progressive=True)[0])
    expected = (
        '<Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" '
        'vcodec="avc1.42001E" acodec="mp4a.40.2" progressive="True" type="video">'
    )
    assert stream == expected


def test_repr_for_adaptive_streams(cipher_signature):
    stream = str(cipher_signature.streams.filter(adaptive=True)[0])
    expected = (
        '<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
        'vcodec="avc1.640028" progressive="False" type="video">'
    )
    assert stream == expected