Merge pull request #49 from hbmartin/added-tests-cli
Browse files- .flake8 +1 -1
- pytube/cli.py +54 -70
- pytube/query.py +4 -2
- tests/test_captions.py +27 -3
- tests/test_cli.py +242 -21
- tests/test_query.py +4 -4
- tests/test_streams.py +16 -16
.flake8
CHANGED
@@ -1,5 +1,5 @@
|
|
1 |
[flake8]
|
2 |
ignore = E231,E203,W503,Q000,WPS111,WPS305,WPS348,WPS602,D400,DAR201,S101,DAR101,C812,D104,I001,WPS306,WPS214,D401,WPS229,WPS420,WPS230,WPS414,WPS114,WPS226,WPS442,C819,WPS601,T001,RST304,WPS410,WPS428,A003,A002,I003,WPS221,WPS326,WPS201,S405,DAR301,WPS210,WPS202,WPS213,WPS301,P103,WPS407,WPS432,WPS211,S314,S310,S001,IF100,PT001
|
3 |
-
max-line-length =
|
4 |
|
5 |
[isort]
|
|
|
1 |
[flake8]
|
2 |
ignore = E231,E203,W503,Q000,WPS111,WPS305,WPS348,WPS602,D400,DAR201,S101,DAR101,C812,D104,I001,WPS306,WPS214,D401,WPS229,WPS420,WPS230,WPS414,WPS114,WPS226,WPS442,C819,WPS601,T001,RST304,WPS410,WPS428,A003,A002,I003,WPS221,WPS326,WPS201,S405,DAR301,WPS210,WPS202,WPS213,WPS301,P103,WPS407,WPS432,WPS211,S314,S310,S001,IF100,PT001
|
3 |
+
max-line-length = 95
|
4 |
|
5 |
[isort]
|
pytube/cli.py
CHANGED
@@ -222,14 +222,12 @@ def _download(
|
|
222 |
stream: Stream, target: Optional[str] = None, filename: Optional[str] = None
|
223 |
) -> None:
|
224 |
filesize_megabytes = stream.filesize // 1048576
|
225 |
-
print(f"{stream.default_filename} | {filesize_megabytes} MB")
|
226 |
stream.download(output_path=target, filename=filename)
|
227 |
sys.stdout.write("\n")
|
228 |
|
229 |
|
230 |
-
def
|
231 |
-
base: str, subtype: Optional[str], video_audio: str, target: str
|
232 |
-
) -> str:
|
233 |
"""
|
234 |
Given a base name, the file format, and the target directory, will generate
|
235 |
a filename unique for that directory and file format.
|
@@ -237,15 +235,17 @@ def unique_name(
|
|
237 |
The given base-name.
|
238 |
:param str subtype:
|
239 |
The filetype of the video which will be downloaded.
|
|
|
|
|
240 |
:param Path target:
|
241 |
Target directory for download.
|
242 |
"""
|
243 |
counter = 0
|
244 |
while True:
|
245 |
-
|
246 |
-
|
247 |
-
if not os.path.exists(
|
248 |
-
return
|
249 |
counter += 1
|
250 |
|
251 |
|
@@ -253,7 +253,7 @@ def ffmpeg_process(
|
|
253 |
youtube: YouTube, resolution: str, target: Optional[str] = None
|
254 |
) -> None:
|
255 |
"""
|
256 |
-
Decides the correct video stream to download, then calls
|
257 |
|
258 |
:param YouTube youtube:
|
259 |
A valid YouTube object.
|
@@ -263,91 +263,78 @@ def ffmpeg_process(
|
|
263 |
Target directory for download
|
264 |
"""
|
265 |
youtube.register_on_progress_callback(on_progress)
|
266 |
-
|
267 |
-
target = os.getcwd()
|
268 |
|
269 |
if resolution == "best":
|
270 |
-
|
271 |
-
youtube.streams.filter(progressive=False)
|
272 |
-
.order_by("resolution")
|
273 |
-
.desc()
|
274 |
-
.first()
|
275 |
)
|
276 |
-
|
277 |
-
video_stream = (
|
278 |
youtube.streams.filter(progressive=False, subtype="mp4")
|
279 |
.order_by("resolution")
|
280 |
-
.
|
281 |
-
.first()
|
282 |
)
|
283 |
-
|
284 |
-
|
285 |
-
ffmpeg_downloader(youtube=youtube, stream=video_stream, target=target)
|
286 |
else:
|
287 |
-
|
288 |
else:
|
289 |
video_stream = youtube.streams.filter(
|
290 |
progressive=False, resolution=resolution, subtype="mp4"
|
291 |
).first()
|
292 |
-
if
|
293 |
-
ffmpeg_downloader(youtube=youtube, stream=video_stream, target=target)
|
294 |
-
else:
|
295 |
video_stream = youtube.streams.filter(
|
296 |
progressive=False, resolution=resolution
|
297 |
).first()
|
298 |
-
|
299 |
-
|
300 |
-
|
301 |
-
|
302 |
-
|
303 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
304 |
|
305 |
|
306 |
-
def
|
307 |
"""
|
308 |
Given a YouTube Stream object, finds the correct audio stream, downloads them both
|
309 |
giving them a unique name, them uses ffmpeg to create a new file with the audio
|
310 |
and video from the previously downloaded files. Then deletes the original adaptive
|
311 |
streams, leaving the combination.
|
312 |
|
313 |
-
:param
|
314 |
-
A valid
|
315 |
-
:param Stream
|
316 |
-
A valid Stream object
|
317 |
:param Path target:
|
318 |
A valid Path object
|
319 |
"""
|
320 |
-
|
321 |
-
|
322 |
-
.order_by("abr")
|
323 |
-
.desc()
|
324 |
-
.first()
|
325 |
-
)
|
326 |
-
|
327 |
-
video_unique_name = unique_name(
|
328 |
-
safe_filename(stream.title), stream.subtype, "video", target=target
|
329 |
)
|
330 |
-
audio_unique_name =
|
331 |
-
safe_filename(
|
332 |
)
|
333 |
-
_download(stream=
|
|
|
334 |
_download(stream=audio_stream, target=target, filename=audio_unique_name)
|
335 |
|
336 |
-
video_path = os.path.join(target, f"{video_unique_name}.{
|
337 |
-
audio_path = os.path.join(target, f"{audio_unique_name}.{
|
338 |
-
final_path = os.path.join(
|
|
|
|
|
339 |
|
340 |
subprocess.run( # nosec
|
341 |
-
[
|
342 |
-
"ffmpeg",
|
343 |
-
"-i",
|
344 |
-
f"{video_path}",
|
345 |
-
"-i",
|
346 |
-
f"{audio_path}",
|
347 |
-
"-codec",
|
348 |
-
"copy",
|
349 |
-
f"{final_path}",
|
350 |
-
]
|
351 |
)
|
352 |
os.unlink(video_path)
|
353 |
os.unlink(audio_path)
|
@@ -439,11 +426,11 @@ def download_caption(
|
|
439 |
_print_available_captions(youtube.captions)
|
440 |
return
|
441 |
|
442 |
-
|
443 |
-
|
444 |
downloaded_path = caption.download(title=youtube.title, output_path=target)
|
445 |
print(f"Saved caption file to: {downloaded_path}")
|
446 |
-
|
447 |
print(f"Unable to find caption with code: {lang_code}")
|
448 |
_print_available_captions(youtube.captions)
|
449 |
|
@@ -463,10 +450,7 @@ def download_audio(
|
|
463 |
Target directory for download
|
464 |
"""
|
465 |
audio = (
|
466 |
-
youtube.streams.filter(only_audio=True, subtype=filetype)
|
467 |
-
.order_by("abr")
|
468 |
-
.desc()
|
469 |
-
.first()
|
470 |
)
|
471 |
|
472 |
if audio is None:
|
|
|
222 |
stream: Stream, target: Optional[str] = None, filename: Optional[str] = None
|
223 |
) -> None:
|
224 |
filesize_megabytes = stream.filesize // 1048576
|
225 |
+
print(f"{filename or stream.default_filename} | {filesize_megabytes} MB")
|
226 |
stream.download(output_path=target, filename=filename)
|
227 |
sys.stdout.write("\n")
|
228 |
|
229 |
|
230 |
+
def _unique_name(base: str, subtype: str, media_type: str, target: str) -> str:
|
|
|
|
|
231 |
"""
|
232 |
Given a base name, the file format, and the target directory, will generate
|
233 |
a filename unique for that directory and file format.
|
|
|
235 |
The given base-name.
|
236 |
:param str subtype:
|
237 |
The filetype of the video which will be downloaded.
|
238 |
+
:param str media_type:
|
239 |
+
The media_type of the file, ie. "audio" or "video"
|
240 |
:param Path target:
|
241 |
Target directory for download.
|
242 |
"""
|
243 |
counter = 0
|
244 |
while True:
|
245 |
+
file_name = f"{base}_{media_type}_{counter}"
|
246 |
+
file_path = os.path.join(target, f"{file_name}.{subtype}")
|
247 |
+
if not os.path.exists(file_path):
|
248 |
+
return file_name
|
249 |
counter += 1
|
250 |
|
251 |
|
|
|
253 |
youtube: YouTube, resolution: str, target: Optional[str] = None
|
254 |
) -> None:
|
255 |
"""
|
256 |
+
Decides the correct video stream to download, then calls _ffmpeg_downloader.
|
257 |
|
258 |
:param YouTube youtube:
|
259 |
A valid YouTube object.
|
|
|
263 |
Target directory for download
|
264 |
"""
|
265 |
youtube.register_on_progress_callback(on_progress)
|
266 |
+
target = target or os.getcwd()
|
|
|
267 |
|
268 |
if resolution == "best":
|
269 |
+
highest_quality_stream = (
|
270 |
+
youtube.streams.filter(progressive=False).order_by("resolution").last()
|
|
|
|
|
|
|
271 |
)
|
272 |
+
mp4_stream = (
|
|
|
273 |
youtube.streams.filter(progressive=False, subtype="mp4")
|
274 |
.order_by("resolution")
|
275 |
+
.last()
|
|
|
276 |
)
|
277 |
+
if highest_quality_stream.resolution == mp4_stream.resolution:
|
278 |
+
video_stream = mp4_stream
|
|
|
279 |
else:
|
280 |
+
video_stream = highest_quality_stream
|
281 |
else:
|
282 |
video_stream = youtube.streams.filter(
|
283 |
progressive=False, resolution=resolution, subtype="mp4"
|
284 |
).first()
|
285 |
+
if not video_stream:
|
|
|
|
|
286 |
video_stream = youtube.streams.filter(
|
287 |
progressive=False, resolution=resolution
|
288 |
).first()
|
289 |
+
if video_stream is None:
|
290 |
+
print(f"Could not find a stream with resolution: {resolution}")
|
291 |
+
print("Try one of these:")
|
292 |
+
display_streams(youtube)
|
293 |
+
sys.exit()
|
294 |
+
|
295 |
+
audio_stream = youtube.streams.get_audio_only(video_stream.subtype)
|
296 |
+
if not audio_stream:
|
297 |
+
audio_stream = youtube.streams.filter(only_audio=True).order_by("abr").last()
|
298 |
+
if not audio_stream:
|
299 |
+
print("Could not find an audio only stream")
|
300 |
+
sys.exit()
|
301 |
+
_ffmpeg_downloader(
|
302 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
303 |
+
)
|
304 |
|
305 |
|
306 |
+
def _ffmpeg_downloader(audio_stream: Stream, video_stream: Stream, target: str) -> None:
|
307 |
"""
|
308 |
Given a YouTube Stream object, finds the correct audio stream, downloads them both
|
309 |
giving them a unique name, them uses ffmpeg to create a new file with the audio
|
310 |
and video from the previously downloaded files. Then deletes the original adaptive
|
311 |
streams, leaving the combination.
|
312 |
|
313 |
+
:param Stream audio_stream:
|
314 |
+
A valid Stream object representing the audio to download
|
315 |
+
:param Stream video_stream:
|
316 |
+
A valid Stream object representing the video to download
|
317 |
:param Path target:
|
318 |
A valid Path object
|
319 |
"""
|
320 |
+
video_unique_name = _unique_name(
|
321 |
+
safe_filename(video_stream.title), video_stream.subtype, "video", target=target
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
322 |
)
|
323 |
+
audio_unique_name = _unique_name(
|
324 |
+
safe_filename(video_stream.title), audio_stream.subtype, "audio", target=target
|
325 |
)
|
326 |
+
_download(stream=video_stream, target=target, filename=video_unique_name)
|
327 |
+
print("Loading audio...")
|
328 |
_download(stream=audio_stream, target=target, filename=audio_unique_name)
|
329 |
|
330 |
+
video_path = os.path.join(target, f"{video_unique_name}.{video_stream.subtype}")
|
331 |
+
audio_path = os.path.join(target, f"{audio_unique_name}.{audio_stream.subtype}")
|
332 |
+
final_path = os.path.join(
|
333 |
+
target, f"{safe_filename(video_stream.title)}.{video_stream.subtype}"
|
334 |
+
)
|
335 |
|
336 |
subprocess.run( # nosec
|
337 |
+
["ffmpeg", "-i", video_path, "-i", audio_path, "-codec", "copy", final_path,]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
338 |
)
|
339 |
os.unlink(video_path)
|
340 |
os.unlink(audio_path)
|
|
|
426 |
_print_available_captions(youtube.captions)
|
427 |
return
|
428 |
|
429 |
+
try:
|
430 |
+
caption = youtube.captions[lang_code]
|
431 |
downloaded_path = caption.download(title=youtube.title, output_path=target)
|
432 |
print(f"Saved caption file to: {downloaded_path}")
|
433 |
+
except KeyError:
|
434 |
print(f"Unable to find caption with code: {lang_code}")
|
435 |
_print_available_captions(youtube.captions)
|
436 |
|
|
|
450 |
Target directory for download
|
451 |
"""
|
452 |
audio = (
|
453 |
+
youtube.streams.filter(only_audio=True, subtype=filetype).order_by("abr").last()
|
|
|
|
|
|
|
454 |
)
|
455 |
|
456 |
if audio is None:
|
pytube/query.py
CHANGED
@@ -369,7 +369,9 @@ class CaptionQuery(Mapping):
|
|
369 |
self.lang_code_index = {c.code: c for c in captions}
|
370 |
|
371 |
@deprecated("This object can be treated as a dictionary, i.e. captions['en']")
|
372 |
-
def get_by_language_code(
|
|
|
|
|
373 |
"""Get the :class:`Caption <Caption>` for a given ``lang_code``.
|
374 |
|
375 |
:param str lang_code:
|
@@ -397,7 +399,7 @@ class CaptionQuery(Mapping):
|
|
397 |
return len(self.lang_code_index)
|
398 |
|
399 |
def __iter__(self):
|
400 |
-
return iter(self.lang_code_index)
|
401 |
|
402 |
def __repr__(self) -> str:
|
403 |
return f"{self.lang_code_index}"
|
|
|
369 |
self.lang_code_index = {c.code: c for c in captions}
|
370 |
|
371 |
@deprecated("This object can be treated as a dictionary, i.e. captions['en']")
|
372 |
+
def get_by_language_code(
|
373 |
+
self, lang_code: str
|
374 |
+
) -> Optional[Caption]: # pragma: no cover
|
375 |
"""Get the :class:`Caption <Caption>` for a given ``lang_code``.
|
376 |
|
377 |
:param str lang_code:
|
|
|
399 |
return len(self.lang_code_index)
|
400 |
|
401 |
def __iter__(self):
|
402 |
+
return iter(self.lang_code_index.values())
|
403 |
|
404 |
def __repr__(self) -> str:
|
405 |
return f"{self.lang_code_index}"
|
tests/test_captions.py
CHANGED
@@ -26,7 +26,8 @@ def test_caption_query_sequence():
|
|
26 |
assert caption_query["en"] == caption1
|
27 |
assert caption_query["fr"] == caption2
|
28 |
with pytest.raises(KeyError):
|
29 |
-
caption_query["nada"]
|
|
|
30 |
|
31 |
|
32 |
def test_caption_query_get_by_language_code_when_exists():
|
@@ -37,7 +38,7 @@ def test_caption_query_get_by_language_code_when_exists():
|
|
37 |
{"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
|
38 |
)
|
39 |
caption_query = CaptionQuery(captions=[caption1, caption2])
|
40 |
-
assert caption_query
|
41 |
|
42 |
|
43 |
def test_caption_query_get_by_language_code_when_not_exists():
|
@@ -48,7 +49,9 @@ def test_caption_query_get_by_language_code_when_not_exists():
|
|
48 |
{"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
|
49 |
)
|
50 |
caption_query = CaptionQuery(captions=[caption1, caption2])
|
51 |
-
|
|
|
|
|
52 |
|
53 |
|
54 |
@mock.patch("pytube.captions.Caption.generate_srt_captions")
|
@@ -118,3 +121,24 @@ def test_xml_captions(request_get):
|
|
118 |
{"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
|
119 |
)
|
120 |
assert caption.xml_captions == "test"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
assert caption_query["en"] == caption1
|
27 |
assert caption_query["fr"] == caption2
|
28 |
with pytest.raises(KeyError):
|
29 |
+
not_exists = caption_query["nada"]
|
30 |
+
assert not_exists is not None # should never reach this
|
31 |
|
32 |
|
33 |
def test_caption_query_get_by_language_code_when_exists():
|
|
|
38 |
{"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
|
39 |
)
|
40 |
caption_query = CaptionQuery(captions=[caption1, caption2])
|
41 |
+
assert caption_query["en"] == caption1
|
42 |
|
43 |
|
44 |
def test_caption_query_get_by_language_code_when_not_exists():
|
|
|
49 |
{"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
|
50 |
)
|
51 |
caption_query = CaptionQuery(captions=[caption1, caption2])
|
52 |
+
with pytest.raises(KeyError):
|
53 |
+
not_found = caption_query["hello"]
|
54 |
+
assert not_found is not None # should never reach here
|
55 |
|
56 |
|
57 |
@mock.patch("pytube.captions.Caption.generate_srt_captions")
|
|
|
121 |
{"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
|
122 |
)
|
123 |
assert caption.xml_captions == "test"
|
124 |
+
|
125 |
+
|
126 |
+
@mock.patch("pytube.captions.request")
|
127 |
+
def test_generate_srt_captions(request):
|
128 |
+
request.get.return_value = (
|
129 |
+
'<?xml version="1.0" encoding="utf-8" ?><transcript><text start="6.5" dur="1.7">['
|
130 |
+
'Herb, Software Engineer]\n本影片包含隱藏式字幕。</text><text start="8.3" dur="2.7">'
|
131 |
+
"如要啓動字幕,請按一下這裡的圖示。</text></transcript>"
|
132 |
+
)
|
133 |
+
caption = Caption(
|
134 |
+
{"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
|
135 |
+
)
|
136 |
+
assert caption.generate_srt_captions() == (
|
137 |
+
"1\n"
|
138 |
+
"00:00:06,500 --> 00:00:08,200\n"
|
139 |
+
"[Herb, Software Engineer] 本影片包含隱藏式字幕。\n"
|
140 |
+
"\n"
|
141 |
+
"2\n"
|
142 |
+
"00:00:08,300 --> 00:00:11,000\n"
|
143 |
+
"如要啓動字幕,請按一下這裡的圖示。"
|
144 |
+
)
|
tests/test_cli.py
CHANGED
@@ -6,10 +6,20 @@ from unittest.mock import MagicMock, patch
|
|
6 |
import pytest
|
7 |
|
8 |
from pytube import cli, StreamQuery, Caption, CaptionQuery
|
|
|
9 |
|
10 |
parse_args = cli._parse_args
|
11 |
|
12 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
13 |
@mock.patch("pytube.cli.display_streams")
|
14 |
@mock.patch("pytube.cli.YouTube")
|
15 |
def test_download_when_itag_not_found(youtube, display_streams):
|
@@ -91,6 +101,22 @@ def test_download_caption_with_lang_not_found(youtube, print_available):
|
|
91 |
print_available.assert_called_with(youtube.captions)
|
92 |
|
93 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
94 |
def test_display_progress_bar(capsys):
|
95 |
cli.display_progress_bar(bytes_received=25, filesize=100, scale=0.55)
|
96 |
out, _ = capsys.readouterr()
|
@@ -192,14 +218,46 @@ def test_download_by_resolution_flag(youtube, download_by_resolution):
|
|
192 |
download_by_resolution.assert_called()
|
193 |
|
194 |
|
|
|
195 |
@mock.patch("pytube.cli.Playlist")
|
196 |
-
|
|
|
|
|
197 |
cli.safe_filename = MagicMock(return_value="safe_title")
|
198 |
parser = argparse.ArgumentParser()
|
199 |
args = parse_args(parser, ["https://www.youtube.com/playlist?list=PLyn"])
|
200 |
cli._parse_args = MagicMock(return_value=args)
|
|
|
|
|
|
|
|
|
201 |
cli.main()
|
|
|
202 |
playlist.assert_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
203 |
|
204 |
|
205 |
@mock.patch("pytube.cli.YouTube")
|
@@ -227,39 +285,191 @@ def test_download_by_resolution_not_exists(youtube, stream_query):
|
|
227 |
|
228 |
|
229 |
@mock.patch("pytube.cli.YouTube")
|
230 |
-
@mock.patch("pytube.cli.
|
231 |
-
def
|
|
|
232 |
parser = argparse.ArgumentParser()
|
233 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "best"])
|
234 |
cli._parse_args = MagicMock(return_value=args)
|
235 |
-
|
236 |
-
cli.
|
237 |
-
|
238 |
-
|
239 |
-
|
240 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
241 |
|
242 |
|
|
|
243 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
244 |
-
def
|
|
|
245 |
parser = argparse.ArgumentParser()
|
246 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-a", "mp4"])
|
247 |
cli._parse_args = MagicMock(return_value=args)
|
248 |
-
|
249 |
cli.main()
|
|
|
250 |
youtube.assert_called()
|
251 |
-
|
252 |
|
253 |
|
254 |
-
@mock.patch("pytube.cli.
|
255 |
-
|
256 |
-
|
257 |
-
|
258 |
-
|
259 |
-
|
260 |
-
|
261 |
-
|
262 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
263 |
|
264 |
|
265 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
@@ -271,3 +481,14 @@ def test_perform_args_on_youtube(youtube):
|
|
271 |
cli.main()
|
272 |
youtube.assert_called()
|
273 |
cli._perform_args_on_youtube.assert_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
6 |
import pytest
|
7 |
|
8 |
from pytube import cli, StreamQuery, Caption, CaptionQuery
|
9 |
+
from pytube.exceptions import PytubeError
|
10 |
|
11 |
parse_args = cli._parse_args
|
12 |
|
13 |
|
14 |
+
@mock.patch("pytube.cli._parse_args")
|
15 |
+
def test_main_invalid_url(_parse_args):
|
16 |
+
parser = argparse.ArgumentParser()
|
17 |
+
args = parse_args(parser, ["crikey",],)
|
18 |
+
_parse_args.return_value = args
|
19 |
+
with pytest.raises(SystemExit):
|
20 |
+
cli.main()
|
21 |
+
|
22 |
+
|
23 |
@mock.patch("pytube.cli.display_streams")
|
24 |
@mock.patch("pytube.cli.YouTube")
|
25 |
def test_download_when_itag_not_found(youtube, display_streams):
|
|
|
101 |
print_available.assert_called_with(youtube.captions)
|
102 |
|
103 |
|
104 |
+
def test_print_available_captions(capsys):
|
105 |
+
# Given
|
106 |
+
caption1 = Caption(
|
107 |
+
{"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
|
108 |
+
)
|
109 |
+
caption2 = Caption(
|
110 |
+
{"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
|
111 |
+
)
|
112 |
+
query = CaptionQuery([caption1, caption2])
|
113 |
+
# When
|
114 |
+
cli._print_available_captions(query)
|
115 |
+
# Then
|
116 |
+
captured = capsys.readouterr()
|
117 |
+
assert captured.out == "Available caption codes are: en, fr\n"
|
118 |
+
|
119 |
+
|
120 |
def test_display_progress_bar(capsys):
|
121 |
cli.display_progress_bar(bytes_received=25, filesize=100, scale=0.55)
|
122 |
out, _ = capsys.readouterr()
|
|
|
218 |
download_by_resolution.assert_called()
|
219 |
|
220 |
|
221 |
+
@mock.patch("pytube.cli.YouTube")
|
222 |
@mock.patch("pytube.cli.Playlist")
|
223 |
+
@mock.patch("pytube.cli._perform_args_on_youtube")
|
224 |
+
def test_download_with_playlist(perform_args_on_youtube, playlist, youtube):
|
225 |
+
# Given
|
226 |
cli.safe_filename = MagicMock(return_value="safe_title")
|
227 |
parser = argparse.ArgumentParser()
|
228 |
args = parse_args(parser, ["https://www.youtube.com/playlist?list=PLyn"])
|
229 |
cli._parse_args = MagicMock(return_value=args)
|
230 |
+
videos = [youtube]
|
231 |
+
playlist_instance = playlist.return_value
|
232 |
+
playlist_instance.videos = videos
|
233 |
+
# When
|
234 |
cli.main()
|
235 |
+
# Then
|
236 |
playlist.assert_called()
|
237 |
+
perform_args_on_youtube.assert_called_with(youtube, args)
|
238 |
+
|
239 |
+
|
240 |
+
@mock.patch("pytube.cli.YouTube")
|
241 |
+
@mock.patch("pytube.cli.Playlist")
|
242 |
+
@mock.patch("pytube.cli._perform_args_on_youtube")
|
243 |
+
def test_download_with_playlist_video_error(
|
244 |
+
perform_args_on_youtube, playlist, youtube, capsys
|
245 |
+
):
|
246 |
+
# Given
|
247 |
+
cli.safe_filename = MagicMock(return_value="safe_title")
|
248 |
+
parser = argparse.ArgumentParser()
|
249 |
+
args = parse_args(parser, ["https://www.youtube.com/playlist?list=PLyn"])
|
250 |
+
cli._parse_args = MagicMock(return_value=args)
|
251 |
+
videos = [youtube]
|
252 |
+
playlist_instance = playlist.return_value
|
253 |
+
playlist_instance.videos = videos
|
254 |
+
perform_args_on_youtube.side_effect = PytubeError()
|
255 |
+
# When
|
256 |
+
cli.main()
|
257 |
+
# Then
|
258 |
+
playlist.assert_called()
|
259 |
+
captured = capsys.readouterr()
|
260 |
+
assert "There was an error with video" in captured.out
|
261 |
|
262 |
|
263 |
@mock.patch("pytube.cli.YouTube")
|
|
|
285 |
|
286 |
|
287 |
@mock.patch("pytube.cli.YouTube")
|
288 |
+
@mock.patch("pytube.cli.ffmpeg_process")
|
289 |
+
def test_perform_args_should_ffmpeg_process(ffmpeg_process, youtube):
|
290 |
+
# Given
|
291 |
parser = argparse.ArgumentParser()
|
292 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "best"])
|
293 |
cli._parse_args = MagicMock(return_value=args)
|
294 |
+
# When
|
295 |
+
cli._perform_args_on_youtube(youtube, args)
|
296 |
+
# Then
|
297 |
+
ffmpeg_process.assert_called_with(youtube=youtube, resolution="best", target=None)
|
298 |
+
|
299 |
+
|
300 |
+
@mock.patch("pytube.cli.YouTube")
|
301 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
302 |
+
def test_ffmpeg_process_best_should_download(_ffmpeg_downloader, youtube):
|
303 |
+
# Given
|
304 |
+
target = "/target"
|
305 |
+
streams = MagicMock()
|
306 |
+
youtube.streams = streams
|
307 |
+
video_stream = MagicMock()
|
308 |
+
streams.filter.return_value.order_by.return_value.last.return_value = video_stream
|
309 |
+
audio_stream = MagicMock()
|
310 |
+
streams.get_audio_only.return_value = audio_stream
|
311 |
+
# When
|
312 |
+
cli.ffmpeg_process(youtube, "best", target)
|
313 |
+
# Then
|
314 |
+
_ffmpeg_downloader.assert_called_with(
|
315 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
316 |
+
)
|
317 |
+
|
318 |
+
|
319 |
+
@mock.patch("pytube.cli.YouTube")
|
320 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
321 |
+
def test_ffmpeg_process_res_should_download(_ffmpeg_downloader, youtube):
|
322 |
+
# Given
|
323 |
+
target = "/target"
|
324 |
+
streams = MagicMock()
|
325 |
+
youtube.streams = streams
|
326 |
+
video_stream = MagicMock()
|
327 |
+
streams.filter.return_value.first.return_value = video_stream
|
328 |
+
audio_stream = MagicMock()
|
329 |
+
streams.get_audio_only.return_value = audio_stream
|
330 |
+
# When
|
331 |
+
cli.ffmpeg_process(youtube, "XYZp", target)
|
332 |
+
# Then
|
333 |
+
_ffmpeg_downloader.assert_called_with(
|
334 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
335 |
+
)
|
336 |
+
|
337 |
+
|
338 |
+
@mock.patch("pytube.cli.YouTube")
|
339 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
340 |
+
def test_ffmpeg_process_res_none_should_not_download(_ffmpeg_downloader, youtube):
|
341 |
+
# Given
|
342 |
+
target = "/target"
|
343 |
+
streams = MagicMock()
|
344 |
+
youtube.streams = streams
|
345 |
+
streams.filter.return_value.first.return_value = None
|
346 |
+
audio_stream = MagicMock()
|
347 |
+
streams.get_audio_only.return_value = audio_stream
|
348 |
+
# When
|
349 |
+
with pytest.raises(SystemExit):
|
350 |
+
cli.ffmpeg_process(youtube, "XYZp", target)
|
351 |
+
# Then
|
352 |
+
_ffmpeg_downloader.assert_not_called()
|
353 |
+
|
354 |
+
|
355 |
+
@mock.patch("pytube.cli.YouTube")
|
356 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
357 |
+
def test_ffmpeg_process_audio_none_should_fallback_download(
|
358 |
+
_ffmpeg_downloader, youtube
|
359 |
+
):
|
360 |
+
# Given
|
361 |
+
target = "/target"
|
362 |
+
streams = MagicMock()
|
363 |
+
youtube.streams = streams
|
364 |
+
stream = MagicMock()
|
365 |
+
streams.filter.return_value.order_by.return_value.last.return_value = stream
|
366 |
+
streams.get_audio_only.return_value = None
|
367 |
+
# When
|
368 |
+
cli.ffmpeg_process(youtube, "best", target)
|
369 |
+
# Then
|
370 |
+
_ffmpeg_downloader.assert_called_with(
|
371 |
+
audio_stream=stream, video_stream=stream, target=target
|
372 |
+
)
|
373 |
+
|
374 |
+
|
375 |
+
@mock.patch("pytube.cli.YouTube")
|
376 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
377 |
+
def test_ffmpeg_process_audio_fallback_none_should_exit(_ffmpeg_downloader, youtube):
|
378 |
+
# Given
|
379 |
+
target = "/target"
|
380 |
+
streams = MagicMock()
|
381 |
+
youtube.streams = streams
|
382 |
+
stream = MagicMock()
|
383 |
+
streams.filter.return_value.order_by.return_value.last.side_effect = [
|
384 |
+
stream,
|
385 |
+
stream,
|
386 |
+
None,
|
387 |
+
]
|
388 |
+
streams.get_audio_only.return_value = None
|
389 |
+
# When
|
390 |
+
with pytest.raises(SystemExit):
|
391 |
+
cli.ffmpeg_process(youtube, "best", target)
|
392 |
+
# Then
|
393 |
+
_ffmpeg_downloader.assert_not_called()
|
394 |
+
|
395 |
+
|
396 |
+
@mock.patch("pytube.cli.os.unlink", return_value=None)
|
397 |
+
@mock.patch("pytube.cli.subprocess.run", return_value=None)
|
398 |
+
@mock.patch("pytube.cli._download", return_value=None)
|
399 |
+
@mock.patch("pytube.cli._unique_name", return_value=None)
|
400 |
+
def test_ffmpeg_downloader(unique_name, download, run, unlink):
|
401 |
+
# Given
|
402 |
+
target = "target"
|
403 |
+
audio_stream = MagicMock()
|
404 |
+
video_stream = MagicMock()
|
405 |
+
video_stream.id = "video_id"
|
406 |
+
audio_stream.subtype = "audio_subtype"
|
407 |
+
video_stream.subtype = "video_subtype"
|
408 |
+
unique_name.side_effect = ["video_name", "audio_name"]
|
409 |
+
|
410 |
+
# When
|
411 |
+
cli._ffmpeg_downloader(
|
412 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
413 |
+
)
|
414 |
+
# Then
|
415 |
+
download.assert_called()
|
416 |
+
run.assert_called_with(
|
417 |
+
[
|
418 |
+
"ffmpeg",
|
419 |
+
"-i",
|
420 |
+
"target/video_name.video_subtype",
|
421 |
+
"-i",
|
422 |
+
"target/audio_name.audio_subtype",
|
423 |
+
"-codec",
|
424 |
+
"copy",
|
425 |
+
"target/safe_title.video_subtype",
|
426 |
+
]
|
427 |
+
)
|
428 |
+
unlink.assert_called()
|
429 |
|
430 |
|
431 |
+
@mock.patch("pytube.cli.download_audio")
|
432 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
433 |
+
def test_download_audio_args(youtube, download_audio):
|
434 |
+
# Given
|
435 |
parser = argparse.ArgumentParser()
|
436 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-a", "mp4"])
|
437 |
cli._parse_args = MagicMock(return_value=args)
|
438 |
+
# When
|
439 |
cli.main()
|
440 |
+
# Then
|
441 |
youtube.assert_called()
|
442 |
+
download_audio.assert_called()
|
443 |
|
444 |
|
445 |
+
@mock.patch("pytube.cli._download")
|
446 |
+
@mock.patch("pytube.cli.YouTube")
|
447 |
+
def test_download_audio(youtube, download):
|
448 |
+
# Given
|
449 |
+
youtube_instance = youtube.return_value
|
450 |
+
audio_stream = MagicMock()
|
451 |
+
youtube_instance.streams.filter.return_value.order_by.return_value.last.return_value = (
|
452 |
+
audio_stream
|
453 |
+
)
|
454 |
+
# When
|
455 |
+
cli.download_audio(youtube_instance, "filetype", "target")
|
456 |
+
# Then
|
457 |
+
download.assert_called_with(audio_stream, target="target")
|
458 |
+
|
459 |
+
|
460 |
+
@mock.patch("pytube.cli._download")
|
461 |
+
@mock.patch("pytube.cli.YouTube")
|
462 |
+
def test_download_audio_none(youtube, download):
|
463 |
+
# Given
|
464 |
+
youtube_instance = youtube.return_value
|
465 |
+
youtube_instance.streams.filter.return_value.order_by.return_value.last.return_value = (
|
466 |
+
None
|
467 |
+
)
|
468 |
+
# When
|
469 |
+
with pytest.raises(SystemExit):
|
470 |
+
cli.download_audio(youtube_instance, "filetype", "target")
|
471 |
+
# Then
|
472 |
+
download.assert_not_called()
|
473 |
|
474 |
|
475 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
|
|
481 |
cli.main()
|
482 |
youtube.assert_called()
|
483 |
cli._perform_args_on_youtube.assert_called()
|
484 |
+
|
485 |
+
|
486 |
+
@mock.patch("pytube.cli.os.path.exists", return_value=False)
|
487 |
+
def test_unique_name(path_exists):
|
488 |
+
assert cli._unique_name("base", "subtype", "video", "target") == "base_video_0"
|
489 |
+
|
490 |
+
|
491 |
+
@mock.patch("pytube.cli.os.path.exists")
|
492 |
+
def test_unique_name_counter(path_exists):
|
493 |
+
path_exists.side_effect = [True, False]
|
494 |
+
assert cli._unique_name("base", "subtype", "video", "target") == "base_video_1"
|
tests/test_query.py
CHANGED
@@ -44,14 +44,14 @@ def test_get_last(cipher_signature):
|
|
44 |
"""Ensure :meth:`~pytube.StreamQuery.last` returns the expected
|
45 |
:class:`Stream <Stream>`.
|
46 |
"""
|
47 |
-
assert cipher_signature.streams.
|
48 |
|
49 |
|
50 |
def test_get_first(cipher_signature):
|
51 |
"""Ensure :meth:`~pytube.StreamQuery.first` returns the expected
|
52 |
:class:`Stream <Stream>`.
|
53 |
"""
|
54 |
-
assert cipher_signature.streams.
|
55 |
|
56 |
|
57 |
def test_order_by(cipher_signature):
|
@@ -154,10 +154,10 @@ def test_sequence(cipher_signature):
|
|
154 |
|
155 |
|
156 |
def test_otf(cipher_signature):
|
157 |
-
non_otf = cipher_signature.streams.otf()
|
158 |
assert len(non_otf) == 22
|
159 |
|
160 |
-
otf = cipher_signature.streams.otf(True)
|
161 |
assert len(otf) == 0
|
162 |
|
163 |
|
|
|
44 |
"""Ensure :meth:`~pytube.StreamQuery.last` returns the expected
|
45 |
:class:`Stream <Stream>`.
|
46 |
"""
|
47 |
+
assert cipher_signature.streams[-1].itag == 251
|
48 |
|
49 |
|
50 |
def test_get_first(cipher_signature):
|
51 |
"""Ensure :meth:`~pytube.StreamQuery.first` returns the expected
|
52 |
:class:`Stream <Stream>`.
|
53 |
"""
|
54 |
+
assert cipher_signature.streams[0].itag == 18
|
55 |
|
56 |
|
57 |
def test_order_by(cipher_signature):
|
|
|
154 |
|
155 |
|
156 |
def test_otf(cipher_signature):
|
157 |
+
non_otf = cipher_signature.streams.otf()
|
158 |
assert len(non_otf) == 22
|
159 |
|
160 |
+
otf = cipher_signature.streams.otf(True)
|
161 |
assert len(otf) == 0
|
162 |
|
163 |
|
tests/test_streams.py
CHANGED
@@ -11,13 +11,13 @@ from pytube import Stream, streams
|
|
11 |
def test_filesize(cipher_signature, mocker):
|
12 |
mocker.patch.object(request, "head")
|
13 |
request.head.return_value = {"content-length": "6796391"}
|
14 |
-
assert cipher_signature.streams.
|
15 |
|
16 |
|
17 |
def test_filesize_approx(cipher_signature, mocker):
|
18 |
mocker.patch.object(request, "head")
|
19 |
request.head.return_value = {"content-length": "123"}
|
20 |
-
stream = cipher_signature.streams
|
21 |
assert stream.filesize_approx == 22350604
|
22 |
stream.bitrate = None
|
23 |
assert stream.filesize_approx == 123
|
@@ -25,7 +25,7 @@ def test_filesize_approx(cipher_signature, mocker):
|
|
25 |
|
26 |
def test_default_filename(cipher_signature):
|
27 |
expected = "PSY - GANGNAM STYLE(강남스타일) MV.mp4"
|
28 |
-
stream = cipher_signature.streams
|
29 |
assert stream.default_filename == expected
|
30 |
|
31 |
|
@@ -103,7 +103,7 @@ def test_download(cipher_signature, mocker):
|
|
103 |
mocker.patch.object(request, "stream")
|
104 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
105 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
106 |
-
stream = cipher_signature.streams
|
107 |
stream.download()
|
108 |
|
109 |
|
@@ -114,7 +114,7 @@ def test_download_with_prefix(cipher_signature, mocker):
|
|
114 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
115 |
streams.target_directory = MagicMock(return_value="/target")
|
116 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
117 |
-
stream = cipher_signature.streams
|
118 |
file_path = stream.download(filename_prefix="prefix")
|
119 |
assert file_path == "/target/prefixPSY - GANGNAM STYLE(강남스타일) MV.mp4"
|
120 |
|
@@ -126,7 +126,7 @@ def test_download_with_filename(cipher_signature, mocker):
|
|
126 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
127 |
streams.target_directory = MagicMock(return_value="/target")
|
128 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
129 |
-
stream = cipher_signature.streams
|
130 |
file_path = stream.download(filename="cool name bro")
|
131 |
assert file_path == "/target/cool name bro.mp4"
|
132 |
|
@@ -139,7 +139,7 @@ def test_download_with_existing(cipher_signature, mocker):
|
|
139 |
mocker.patch.object(os.path, "isfile")
|
140 |
os.path.isfile.return_value = True
|
141 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
142 |
-
stream = cipher_signature.streams
|
143 |
mocker.patch.object(os.path, "getsize")
|
144 |
os.path.getsize.return_value = stream.filesize
|
145 |
file_path = stream.download()
|
@@ -156,7 +156,7 @@ def test_download_with_existing_no_skip(cipher_signature, mocker):
|
|
156 |
mocker.patch.object(os.path, "isfile")
|
157 |
os.path.isfile.return_value = True
|
158 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
159 |
-
stream = cipher_signature.streams
|
160 |
mocker.patch.object(os.path, "getsize")
|
161 |
os.path.getsize.return_value = stream.filesize
|
162 |
file_path = stream.download(skip_existing=False)
|
@@ -165,12 +165,12 @@ def test_download_with_existing_no_skip(cipher_signature, mocker):
|
|
165 |
|
166 |
|
167 |
def test_progressive_streams_return_includes_audio_track(cipher_signature):
|
168 |
-
stream = cipher_signature.streams.filter(progressive=True)
|
169 |
assert stream.includes_audio_track
|
170 |
|
171 |
|
172 |
def test_progressive_streams_return_includes_video_track(cipher_signature):
|
173 |
-
stream = cipher_signature.streams.filter(progressive=True)
|
174 |
assert stream.includes_video_track
|
175 |
|
176 |
|
@@ -184,7 +184,7 @@ def test_on_progress_hook(cipher_signature, mocker):
|
|
184 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
185 |
|
186 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
187 |
-
stream = cipher_signature.streams
|
188 |
stream.download()
|
189 |
assert callback_fn.called
|
190 |
args, _ = callback_fn.call_args
|
@@ -203,7 +203,7 @@ def test_on_complete_hook(cipher_signature, mocker):
|
|
203 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
204 |
|
205 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
206 |
-
stream = cipher_signature.streams
|
207 |
stream.download()
|
208 |
assert callback_fn.called
|
209 |
|
@@ -233,7 +233,7 @@ def test_thumbnail_when_not_in_details(cipher_signature):
|
|
233 |
|
234 |
|
235 |
def test_repr_for_audio_streams(cipher_signature):
|
236 |
-
stream = str(cipher_signature.streams.filter(only_audio=True)
|
237 |
expected = (
|
238 |
'<Stream: itag="140" mime_type="audio/mp4" abr="128kbps" '
|
239 |
'acodec="mp4a.40.2" progressive="False" type="audio">'
|
@@ -242,7 +242,7 @@ def test_repr_for_audio_streams(cipher_signature):
|
|
242 |
|
243 |
|
244 |
def test_repr_for_video_streams(cipher_signature):
|
245 |
-
stream = str(cipher_signature.streams.filter(only_video=True)
|
246 |
expected = (
|
247 |
'<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
|
248 |
'vcodec="avc1.640028" progressive="False" type="video">'
|
@@ -251,7 +251,7 @@ def test_repr_for_video_streams(cipher_signature):
|
|
251 |
|
252 |
|
253 |
def test_repr_for_progressive_streams(cipher_signature):
|
254 |
-
stream = str(cipher_signature.streams.filter(progressive=True)
|
255 |
expected = (
|
256 |
'<Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" '
|
257 |
'vcodec="avc1.42001E" acodec="mp4a.40.2" progressive="True" type="video">'
|
@@ -260,7 +260,7 @@ def test_repr_for_progressive_streams(cipher_signature):
|
|
260 |
|
261 |
|
262 |
def test_repr_for_adaptive_streams(cipher_signature):
|
263 |
-
stream = str(cipher_signature.streams.filter(adaptive=True)
|
264 |
expected = (
|
265 |
'<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
|
266 |
'vcodec="avc1.640028" progressive="False" type="video">'
|
|
|
11 |
def test_filesize(cipher_signature, mocker):
|
12 |
mocker.patch.object(request, "head")
|
13 |
request.head.return_value = {"content-length": "6796391"}
|
14 |
+
assert cipher_signature.streams[0].filesize == 6796391
|
15 |
|
16 |
|
17 |
def test_filesize_approx(cipher_signature, mocker):
|
18 |
mocker.patch.object(request, "head")
|
19 |
request.head.return_value = {"content-length": "123"}
|
20 |
+
stream = cipher_signature.streams[0]
|
21 |
assert stream.filesize_approx == 22350604
|
22 |
stream.bitrate = None
|
23 |
assert stream.filesize_approx == 123
|
|
|
25 |
|
26 |
def test_default_filename(cipher_signature):
|
27 |
expected = "PSY - GANGNAM STYLE(강남스타일) MV.mp4"
|
28 |
+
stream = cipher_signature.streams[0]
|
29 |
assert stream.default_filename == expected
|
30 |
|
31 |
|
|
|
103 |
mocker.patch.object(request, "stream")
|
104 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
105 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
106 |
+
stream = cipher_signature.streams[0]
|
107 |
stream.download()
|
108 |
|
109 |
|
|
|
114 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
115 |
streams.target_directory = MagicMock(return_value="/target")
|
116 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
117 |
+
stream = cipher_signature.streams[0]
|
118 |
file_path = stream.download(filename_prefix="prefix")
|
119 |
assert file_path == "/target/prefixPSY - GANGNAM STYLE(강남스타일) MV.mp4"
|
120 |
|
|
|
126 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
127 |
streams.target_directory = MagicMock(return_value="/target")
|
128 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
129 |
+
stream = cipher_signature.streams[0]
|
130 |
file_path = stream.download(filename="cool name bro")
|
131 |
assert file_path == "/target/cool name bro.mp4"
|
132 |
|
|
|
139 |
mocker.patch.object(os.path, "isfile")
|
140 |
os.path.isfile.return_value = True
|
141 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
142 |
+
stream = cipher_signature.streams[0]
|
143 |
mocker.patch.object(os.path, "getsize")
|
144 |
os.path.getsize.return_value = stream.filesize
|
145 |
file_path = stream.download()
|
|
|
156 |
mocker.patch.object(os.path, "isfile")
|
157 |
os.path.isfile.return_value = True
|
158 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
159 |
+
stream = cipher_signature.streams[0]
|
160 |
mocker.patch.object(os.path, "getsize")
|
161 |
os.path.getsize.return_value = stream.filesize
|
162 |
file_path = stream.download(skip_existing=False)
|
|
|
165 |
|
166 |
|
167 |
def test_progressive_streams_return_includes_audio_track(cipher_signature):
|
168 |
+
stream = cipher_signature.streams.filter(progressive=True)[0]
|
169 |
assert stream.includes_audio_track
|
170 |
|
171 |
|
172 |
def test_progressive_streams_return_includes_video_track(cipher_signature):
|
173 |
+
stream = cipher_signature.streams.filter(progressive=True)[0]
|
174 |
assert stream.includes_video_track
|
175 |
|
176 |
|
|
|
184 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
185 |
|
186 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
187 |
+
stream = cipher_signature.streams[0]
|
188 |
stream.download()
|
189 |
assert callback_fn.called
|
190 |
args, _ = callback_fn.call_args
|
|
|
203 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
204 |
|
205 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
206 |
+
stream = cipher_signature.streams[0]
|
207 |
stream.download()
|
208 |
assert callback_fn.called
|
209 |
|
|
|
233 |
|
234 |
|
235 |
def test_repr_for_audio_streams(cipher_signature):
|
236 |
+
stream = str(cipher_signature.streams.filter(only_audio=True)[0])
|
237 |
expected = (
|
238 |
'<Stream: itag="140" mime_type="audio/mp4" abr="128kbps" '
|
239 |
'acodec="mp4a.40.2" progressive="False" type="audio">'
|
|
|
242 |
|
243 |
|
244 |
def test_repr_for_video_streams(cipher_signature):
|
245 |
+
stream = str(cipher_signature.streams.filter(only_video=True)[0])
|
246 |
expected = (
|
247 |
'<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
|
248 |
'vcodec="avc1.640028" progressive="False" type="video">'
|
|
|
251 |
|
252 |
|
253 |
def test_repr_for_progressive_streams(cipher_signature):
|
254 |
+
stream = str(cipher_signature.streams.filter(progressive=True)[0])
|
255 |
expected = (
|
256 |
'<Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" '
|
257 |
'vcodec="avc1.42001E" acodec="mp4a.40.2" progressive="True" type="video">'
|
|
|
260 |
|
261 |
|
262 |
def test_repr_for_adaptive_streams(cipher_signature):
|
263 |
+
stream = str(cipher_signature.streams.filter(adaptive=True)[0])
|
264 |
expected = (
|
265 |
'<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" '
|
266 |
'vcodec="avc1.640028" progressive="False" type="video">'
|