hbmartin commited on
Commit
a36c646
·
unverified ·
2 Parent(s): fe4dff7 f7b22d6

Merge pull request #7 from hbmartin/cli-enhancements

Browse files
Files changed (6) hide show
  1. .deepsource.toml +1 -1
  2. README.md +1 -1
  3. pytube/captions.py +66 -1
  4. pytube/cli.py +89 -37
  5. tests/test_captions.py +46 -0
  6. tests/test_cli.py +152 -7
.deepsource.toml CHANGED
@@ -1,7 +1,7 @@
1
  version = 1
2
 
3
  test_patterns = [
4
- "*/tests/**"
5
  ]
6
 
7
  exclude_patterns = [
 
1
  version = 1
2
 
3
  test_patterns = [
4
+ "tests/**"
5
  ]
6
 
7
  exclude_patterns = [
README.md CHANGED
@@ -232,7 +232,7 @@ pytube also ships with a tiny cli interface for downloading and probing videos.
232
  Let's start with downloading:
233
 
234
  ```bash
235
- $ pytube3 http://youtube.com/watch?v=9bZkp7q19f0 --itag=22
236
  ```
237
  To view available streams:
238
 
 
232
  Let's start with downloading:
233
 
234
  ```bash
235
+ $ pytube3 http://youtube.com/watch?v=9bZkp7q19f0 --itag=18
236
  ```
237
  To view available streams:
238
 
pytube/captions.py CHANGED
@@ -1,12 +1,15 @@
1
  # -*- coding: utf-8 -*-
2
  import math
 
3
  import time
4
  import xml.etree.ElementTree as ElementTree
5
- from typing import Dict
6
 
7
  from pytube import request
8
  from html import unescape
9
 
 
 
10
 
11
  class Caption:
12
  """Container for caption tracks."""
@@ -73,6 +76,68 @@ class Caption:
73
  segments.append(line)
74
  return "\n".join(segments).strip()
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  def __repr__(self):
77
  """Printable object representation."""
78
  return '<Caption lang="{s.name}" code="{s.code}">'.format(s=self)
 
1
  # -*- coding: utf-8 -*-
2
  import math
3
+ import os
4
  import time
5
  import xml.etree.ElementTree as ElementTree
6
+ from typing import Dict, Optional
7
 
8
  from pytube import request
9
  from html import unescape
10
 
11
+ from pytube.helpers import safe_filename
12
+
13
 
14
  class Caption:
15
  """Container for caption tracks."""
 
76
  segments.append(line)
77
  return "\n".join(segments).strip()
78
 
79
+ def download(
80
+ self,
81
+ title: str,
82
+ srt: bool = True,
83
+ output_path: Optional[str] = None,
84
+ filename_prefix: Optional[str] = None,
85
+ ) -> str:
86
+ """Write the media stream to disk.
87
+
88
+ :param filename:
89
+ Output filename (stem only) for writing media file.
90
+ If one is not specified, the default filename is used.
91
+ :type filename: str
92
+ :param srt:
93
+ Set to True to download srt, false to download xml. Defaults to True.
94
+ :type srt bool
95
+ :param output_path:
96
+ (optional) Output path for writing media file. If one is not
97
+ specified, defaults to the current working directory.
98
+ :type output_path: str or None
99
+ :param filename_prefix:
100
+ (optional) A string that will be prepended to the filename.
101
+ For example a number in a playlist or the name of a series.
102
+ If one is not specified, nothing will be prepended
103
+ This is separate from filename so you can use the default
104
+ filename but still add a prefix.
105
+ :type filename_prefix: str or None
106
+
107
+ :rtype: str
108
+
109
+ """
110
+ output_path = output_path or os.getcwd()
111
+
112
+ if title.endswith(".srt") or title.endswith(".xml"):
113
+ filename = ".".join(title.split(".")[:-1])
114
+ else:
115
+ filename = title
116
+
117
+ if filename_prefix:
118
+ filename = "{prefix}{filename}".format(
119
+ prefix=safe_filename(filename_prefix), filename=filename,
120
+ )
121
+
122
+ filename = safe_filename(filename)
123
+
124
+ filename += " ({})".format(self.code)
125
+
126
+ if srt:
127
+ filename += ".srt"
128
+ else:
129
+ filename += ".xml"
130
+
131
+ file_path = os.path.join(output_path, filename)
132
+
133
+ with open(file_path, "w", encoding="utf-8") as file_handle:
134
+ if srt:
135
+ file_handle.write(self.generate_srt_captions())
136
+ else:
137
+ file_handle.write(self.xml_captions)
138
+
139
+ return file_path
140
+
141
  def __repr__(self):
142
  """Printable object representation."""
143
  return '<Caption lang="{s.name}" code="{s.code}">'.format(s=self)
pytube/cli.py CHANGED
@@ -1,4 +1,4 @@
1
- # -*- coding: utf-8 -*-
2
  """A simple command line application to download youtube videos."""
3
 
4
  import argparse
@@ -9,9 +9,9 @@ import logging
9
  import os
10
  import sys
11
  from io import BufferedWriter
12
- from typing import Tuple, Any
13
 
14
- from pytube import __version__
15
  from pytube import YouTube
16
 
17
 
@@ -22,6 +22,28 @@ def main():
22
  """Command line application to download youtube videos."""
23
  # noinspection PyTypeChecker
24
  parser = argparse.ArgumentParser(description=main.__doc__)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  parser.add_argument("url", help="The YouTube /watch url", nargs="?")
26
  parser.add_argument(
27
  "--version", action="version", version="%(prog)s " + __version__,
@@ -51,45 +73,41 @@ def main():
51
  action="store_true",
52
  help="Save the html and js to disk",
53
  )
 
 
 
 
 
 
 
 
 
 
 
54
 
55
- args = parser.parse_args()
56
- logging.getLogger().setLevel(max(3 - args.verbosity, 0) * 10)
57
-
58
- if not args.url:
59
- parser.print_help()
60
- sys.exit(1)
61
-
62
- if args.list:
63
- display_streams(args.url)
64
-
65
- elif args.build_playback_report:
66
- build_playback_report(args.url)
67
-
68
- elif args.itag:
69
- download(args.url, args.itag)
70
 
71
 
72
- def build_playback_report(url: str) -> None:
73
  """Serialize the request data to json for offline debugging.
74
 
75
- :param str url:
76
- A valid YouTube watch URL.
77
  """
78
- yt = YouTube(url)
79
  ts = int(dt.datetime.utcnow().timestamp())
80
  fp = os.path.join(
81
- os.getcwd(), "yt-video-{yt.video_id}-{ts}.json.gz".format(yt=yt, ts=ts),
82
  )
83
 
84
- js = yt.js
85
- watch_html = yt.watch_html
86
- vid_info = yt.vid_info
87
 
88
  with gzip.open(fp, "wb") as fh:
89
  fh.write(
90
  json.dumps(
91
  {
92
- "url": url,
93
  "js": js,
94
  "watch_html": watch_html,
95
  "video_info": vid_info,
@@ -147,22 +165,25 @@ def on_progress(
147
  display_progress_bar(bytes_received, filesize)
148
 
149
 
150
- def download(url: str, itag: str) -> None:
151
  """Start downloading a YouTube video.
152
 
153
- :param str url:
154
- A valid YouTube watch URL.
155
  :param str itag:
156
  YouTube format identifier code.
157
 
158
  """
159
  # TODO(nficano): allow download target to be specified
160
  # TODO(nficano): allow dash itags to be selected
161
- yt = YouTube(url, on_progress_callback=on_progress)
162
- stream = yt.streams.get_by_itag(int(itag))
163
  if stream is None:
164
- print("Could not find a stream with itag: " + itag)
 
 
165
  sys.exit()
 
 
166
  print("\n{fn} | {fs} bytes".format(fn=stream.default_filename, fs=stream.filesize,))
167
  try:
168
  stream.download()
@@ -171,17 +192,48 @@ def download(url: str, itag: str) -> None:
171
  sys.exit()
172
 
173
 
174
- def display_streams(url: str) -> None:
175
  """Probe YouTube video and lists its available formats.
176
 
177
- :param str url:
178
  A valid YouTube watch URL.
179
 
180
  """
181
- yt = YouTube(url)
182
- for stream in yt.streams.all():
183
  print(stream)
184
 
185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  if __name__ == "__main__":
187
  main()
 
1
+ #!/usr/bin/env python3
2
  """A simple command line application to download youtube videos."""
3
 
4
  import argparse
 
9
  import os
10
  import sys
11
  from io import BufferedWriter
12
+ from typing import Tuple, Any, Optional, List
13
 
14
+ from pytube import __version__, CaptionQuery
15
  from pytube import YouTube
16
 
17
 
 
22
  """Command line application to download youtube videos."""
23
  # noinspection PyTypeChecker
24
  parser = argparse.ArgumentParser(description=main.__doc__)
25
+ args = _parse_args(parser)
26
+ logging.getLogger().setLevel(max(3 - args.verbosity, 0) * 10)
27
+
28
+ if not args.url:
29
+ parser.print_help()
30
+ sys.exit(1)
31
+
32
+ youtube = YouTube(args.url)
33
+
34
+ if args.list:
35
+ display_streams(youtube)
36
+ if args.build_playback_report:
37
+ build_playback_report(youtube)
38
+ if args.itag:
39
+ download(youtube=youtube, itag=args.itag)
40
+ if hasattr(args, "caption_code"):
41
+ download_caption(youtube=youtube, lang_code=args.caption_code)
42
+
43
+
44
+ def _parse_args(
45
+ parser: argparse.ArgumentParser, args: Optional[List] = None
46
+ ) -> argparse.Namespace:
47
  parser.add_argument("url", help="The YouTube /watch url", nargs="?")
48
  parser.add_argument(
49
  "--version", action="version", version="%(prog)s " + __version__,
 
73
  action="store_true",
74
  help="Save the html and js to disk",
75
  )
76
+ parser.add_argument(
77
+ "-c",
78
+ "--caption-code",
79
+ type=str,
80
+ default=argparse.SUPPRESS,
81
+ nargs="?",
82
+ help=(
83
+ "Download srt captions for given language code. "
84
+ "Prints available language codes if no argument given"
85
+ ),
86
+ )
87
 
88
+ return parser.parse_args(args)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
 
91
+ def build_playback_report(youtube: YouTube) -> None:
92
  """Serialize the request data to json for offline debugging.
93
 
94
+ :param YouTube youtube:
95
+ A YouTube object.
96
  """
 
97
  ts = int(dt.datetime.utcnow().timestamp())
98
  fp = os.path.join(
99
+ os.getcwd(), "yt-video-{yt.video_id}-{ts}.json.gz".format(yt=youtube, ts=ts),
100
  )
101
 
102
+ js = youtube.js
103
+ watch_html = youtube.watch_html
104
+ vid_info = youtube.vid_info
105
 
106
  with gzip.open(fp, "wb") as fh:
107
  fh.write(
108
  json.dumps(
109
  {
110
+ "url": youtube.watch_url,
111
  "js": js,
112
  "watch_html": watch_html,
113
  "video_info": vid_info,
 
165
  display_progress_bar(bytes_received, filesize)
166
 
167
 
168
+ def download(youtube: YouTube, itag: int) -> None:
169
  """Start downloading a YouTube video.
170
 
171
+ :param YouTube youtube:
172
+ A valid YouTube object.
173
  :param str itag:
174
  YouTube format identifier code.
175
 
176
  """
177
  # TODO(nficano): allow download target to be specified
178
  # TODO(nficano): allow dash itags to be selected
179
+ stream = youtube.streams.get_by_itag(itag)
 
180
  if stream is None:
181
+ print("Could not find a stream with itag: {itag}".format(itag=itag))
182
+ print("Try one of these:")
183
+ display_streams(youtube)
184
  sys.exit()
185
+
186
+ youtube.register_on_progress_callback(on_progress)
187
  print("\n{fn} | {fs} bytes".format(fn=stream.default_filename, fs=stream.filesize,))
188
  try:
189
  stream.download()
 
192
  sys.exit()
193
 
194
 
195
+ def display_streams(youtube: YouTube) -> None:
196
  """Probe YouTube video and lists its available formats.
197
 
198
+ :param YouTube youtube:
199
  A valid YouTube watch URL.
200
 
201
  """
202
+ for stream in youtube.streams.all():
 
203
  print(stream)
204
 
205
 
206
+ def _print_available_captions(captions: CaptionQuery) -> None:
207
+ print(
208
+ "Available caption codes are: {}".format(
209
+ ", ".join(c.code for c in captions.all())
210
+ )
211
+ )
212
+
213
+
214
+ def download_caption(youtube: YouTube, lang_code: Optional[str]) -> None:
215
+ """Download a caption for the YouTube video.
216
+
217
+ :param YouTube youtube:
218
+ A valid YouTube object.
219
+ :param str lang_code:
220
+ Language code desired for caption file.
221
+ Prints available codes if the value is None
222
+ or the desired code is not available.
223
+
224
+ """
225
+ if lang_code is None:
226
+ _print_available_captions(youtube.captions)
227
+ return
228
+
229
+ caption = youtube.captions.get_by_language_code(lang_code=lang_code)
230
+ if caption:
231
+ downloaded_path = caption.download(title=youtube.title)
232
+ print("Saved caption file to: {}".format(downloaded_path))
233
+ else:
234
+ print("Unable to find caption with code: {}".format(lang_code))
235
+ _print_available_captions(youtube.captions)
236
+
237
+
238
  if __name__ == "__main__":
239
  main()
tests/test_captions.py CHANGED
@@ -1,3 +1,6 @@
 
 
 
1
  from pytube import Caption, CaptionQuery
2
 
3
 
@@ -39,3 +42,46 @@ def test_caption_query_get_by_language_code_when_not_exists():
39
  )
40
  caption_query = CaptionQuery(captions=[caption1, caption2])
41
  assert caption_query.get_by_language_code("hello") is None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from unittest import mock
2
+ from unittest.mock import patch, mock_open
3
+
4
  from pytube import Caption, CaptionQuery
5
 
6
 
 
42
  )
43
  caption_query = CaptionQuery(captions=[caption1, caption2])
44
  assert caption_query.get_by_language_code("hello") is None
45
+
46
+
47
+ @mock.patch("pytube.captions.Caption.generate_srt_captions")
48
+ def test_download(srt):
49
+ open_mock = mock_open()
50
+ with patch("builtins.open", open_mock):
51
+ srt.return_value = ""
52
+ caption = Caption(
53
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
54
+ )
55
+ caption.download("title")
56
+ assert open_mock.call_args_list[0][0][0].split("/")[-1] == "title (en).srt"
57
+
58
+
59
+ @mock.patch("pytube.captions.Caption.generate_srt_captions")
60
+ def test_download_with_prefix(srt):
61
+ open_mock = mock_open()
62
+ with patch("builtins.open", open_mock):
63
+ srt.return_value = ""
64
+ caption = Caption(
65
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
66
+ )
67
+ caption.download("title", filename_prefix="1 ")
68
+ assert open_mock.call_args_list[0][0][0].split("/")[-1] == "1 title (en).srt"
69
+
70
+
71
+ @mock.patch("pytube.captions.Caption.xml_captions")
72
+ def test_download_xml_and_trim_extension(xml):
73
+ open_mock = mock_open()
74
+ with patch("builtins.open", open_mock):
75
+ xml.return_value = ""
76
+ caption = Caption(
77
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
78
+ )
79
+ caption.download("title.xml", srt=False)
80
+ assert open_mock.call_args_list[0][0][0].split("/")[-1] == "title (en).xml"
81
+
82
+
83
+ def test_repr():
84
+ caption = Caption(
85
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
86
+ )
87
+ assert str(caption) == '<Caption lang="name1" code="en">'
tests/test_cli.py CHANGED
@@ -1,13 +1,158 @@
1
  # -*- coding: utf-8 -*-
 
2
  from unittest import mock
 
3
 
4
- from pytube import cli
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
 
6
 
7
  @mock.patch("pytube.cli.YouTube")
8
- @mock.patch("pytube.cli.sys")
9
- def test_download(MockYouTube, mock_sys):
10
- instance = MockYouTube.return_value
11
- instance.prefetch_descramble.return_value = None
12
- instance.streams = mock.Mock()
13
- cli.download("asdf", 123)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # -*- coding: utf-8 -*-
2
+ import argparse
3
  from unittest import mock
4
+ from unittest.mock import MagicMock, patch
5
 
6
+ import pytest
7
+
8
+ from pytube import cli, StreamQuery, Caption, CaptionQuery
9
+
10
+ parse_args = cli._parse_args
11
+
12
+
13
+ @mock.patch("pytube.cli.YouTube")
14
+ def test_download_when_itag_not_found(youtube):
15
+ youtube.streams = mock.Mock()
16
+ youtube.streams.all.return_value = []
17
+ youtube.streams.get_by_itag.return_value = None
18
+ with pytest.raises(SystemExit):
19
+ cli.download(youtube, 123)
20
+ youtube.streams.get_by_itag.assert_called_with(123)
21
+
22
+
23
+ @mock.patch("pytube.cli.YouTube")
24
+ @mock.patch("pytube.Stream")
25
+ def test_download_when_itag_is_found(youtube, stream):
26
+ stream.itag = 123
27
+ youtube.streams = StreamQuery([stream])
28
+ with patch.object(
29
+ youtube.streams, "get_by_itag", wraps=youtube.streams.get_by_itag
30
+ ) as wrapped_itag:
31
+ cli.download(youtube, 123)
32
+ wrapped_itag.assert_called_with(123)
33
+ youtube.register_on_progress_callback.assert_called_with(cli.on_progress)
34
+ stream.download.assert_called()
35
 
36
 
37
  @mock.patch("pytube.cli.YouTube")
38
+ @mock.patch("pytube.Stream")
39
+ def test_display_stream(youtube, stream):
40
+ stream.itag = 123
41
+ stream.__repr__ = MagicMock(return_value="")
42
+ youtube.streams = StreamQuery([stream])
43
+ with patch.object(youtube.streams, "all", wraps=youtube.streams.all) as wrapped_all:
44
+ cli.display_streams(youtube)
45
+ wrapped_all.assert_called()
46
+ stream.__repr__.assert_called()
47
+
48
+
49
+ @mock.patch("pytube.cli.YouTube")
50
+ def test_download_caption_with_none(youtube):
51
+ caption = Caption(
52
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
53
+ )
54
+ youtube.captions = CaptionQuery([caption])
55
+ with patch.object(
56
+ youtube.captions, "all", wraps=youtube.captions.all
57
+ ) as wrapped_all:
58
+ cli.download_caption(youtube, None)
59
+ wrapped_all.assert_called()
60
+
61
+
62
+ @mock.patch("pytube.cli.YouTube")
63
+ def test_download_caption_with_language_found(youtube):
64
+ youtube.title = "video title"
65
+ caption = Caption(
66
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
67
+ )
68
+ caption.download = MagicMock(return_value="file_path")
69
+ youtube.captions = CaptionQuery([caption])
70
+ cli.download_caption(youtube, "en")
71
+ caption.download.assert_called_with(title="video title")
72
+
73
+
74
+ @mock.patch("pytube.cli.YouTube")
75
+ def test_download_caption_with_language_not_found(youtube):
76
+ caption = Caption(
77
+ {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
78
+ )
79
+ youtube.captions = CaptionQuery([caption])
80
+ with patch.object(
81
+ youtube.captions, "all", wraps=youtube.captions.all
82
+ ) as wrapped_all:
83
+ cli.download_caption(youtube, "blah")
84
+ wrapped_all.assert_called()
85
+
86
+
87
+ @mock.patch("pytube.Stream")
88
+ @mock.patch("io.BufferedWriter")
89
+ def test_on_progress(stream, writer):
90
+ stream.filesize = 10
91
+ cli.display_progress_bar = MagicMock()
92
+ cli.on_progress(stream, "", writer, 7)
93
+ cli.display_progress_bar.assert_called_once_with(3, 10)
94
+
95
+
96
+ def test_parse_args_falsey():
97
+ parser = argparse.ArgumentParser()
98
+ args = cli._parse_args(parser, ["urlhere"])
99
+ assert args.url == "urlhere"
100
+ assert args.build_playback_report is False
101
+ assert args.itag is None
102
+ assert args.list is False
103
+ assert args.verbosity == 0
104
+
105
+
106
+ def test_parse_args_truthy():
107
+ parser = argparse.ArgumentParser()
108
+ args = cli._parse_args(
109
+ parser, ["urlhere", "--build-playback-report", "-c", "en", "-l", "--itag=10"]
110
+ )
111
+ assert args.url == "urlhere"
112
+ assert args.build_playback_report is True
113
+ assert args.itag == 10
114
+ assert args.list is True
115
+
116
+
117
+ @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
118
+ def test_main_download(youtube):
119
+ parser = argparse.ArgumentParser()
120
+ args = parse_args(parser, ["urlhere", "--itag=10"])
121
+ cli._parse_args = MagicMock(return_value=args)
122
+ cli.download = MagicMock()
123
+ cli.main()
124
+ youtube.assert_called()
125
+ cli.download.assert_called()
126
+
127
+
128
+ @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
129
+ def test_main_build_playback_report(youtube):
130
+ parser = argparse.ArgumentParser()
131
+ args = parse_args(parser, ["urlhere", "--build-playback-report"])
132
+ cli._parse_args = MagicMock(return_value=args)
133
+ cli.build_playback_report = MagicMock()
134
+ cli.main()
135
+ youtube.assert_called()
136
+ cli.build_playback_report.assert_called()
137
+
138
+
139
+ @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
140
+ def test_main_display_streams(youtube):
141
+ parser = argparse.ArgumentParser()
142
+ args = parse_args(parser, ["urlhere", "-l"])
143
+ cli._parse_args = MagicMock(return_value=args)
144
+ cli.display_streams = MagicMock()
145
+ cli.main()
146
+ youtube.assert_called()
147
+ cli.display_streams.assert_called()
148
+
149
+
150
+ @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
151
+ def test_main_download_caption(youtube):
152
+ parser = argparse.ArgumentParser()
153
+ args = parse_args(parser, ["urlhere", "-c"])
154
+ cli._parse_args = MagicMock(return_value=args)
155
+ cli.download_caption = MagicMock()
156
+ cli.main()
157
+ youtube.assert_called()
158
+ cli.download_caption.assert_called()