hbmartin commited on
Commit
e856a54
·
unverified ·
2 Parent(s): 51df399 e10b0fd

Merge pull request #52 from hbmartin/sequence-objects

Browse files

Playlist, StreamQuery, and CaptionQuery implement Sequence

README.md CHANGED
@@ -36,14 +36,12 @@ $ pip install pytube3 --upgrade
36
  ## Quick start
37
  ```python
38
  >>> from pytube import YouTube
39
- >>> YouTube('https://youtu.be/9bZkp7q19f0').streams.first().download()
40
  >>>
41
  >>> yt = YouTube('http://youtube.com/watch?v=9bZkp7q19f0')
42
  >>> yt.streams
43
  ... .filter(progressive=True, file_extension='mp4')
44
- ... .order_by('resolution')
45
- ... .desc()
46
- ... .first()
47
  ... .download()
48
  ```
49
 
@@ -64,7 +62,7 @@ Let's begin with showing how easy it is to download a video with pytube:
64
 
65
  ```python
66
  >>> from pytube import YouTube
67
- >>> YouTube('http://youtube.com/watch?v=9bZkp7q19f0').streams.first().download()
68
  ```
69
  This example will download the highest quality progressive download stream available.
70
 
@@ -72,7 +70,7 @@ Next, let's explore how we would view what video streams are available:
72
 
73
  ```python
74
  >>> yt = YouTube('http://youtube.com/watch?v=9bZkp7q19f0')
75
- >>> yt.streams.all()
76
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
77
  <Stream: itag="43" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp8.0" acodec="vorbis">,
78
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
@@ -108,7 +106,7 @@ The legacy streams that contain the audio and video in a single file (referred t
108
  To only view these progressive download streams:
109
 
110
  ```python
111
- >>> yt.streams.filter(progressive=True).all()
112
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
113
  <Stream: itag="43" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp8.0" acodec="vorbis">,
114
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
@@ -119,7 +117,7 @@ To only view these progressive download streams:
119
  Conversely, if you only want to see the DASH streams (also referred to as "adaptive") you can do:
120
 
121
  ```python
122
- >>> yt.streams.filter(adaptive=True).all()
123
  [<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" vcodec="avc1.640028">,
124
  <Stream: itag="248" mime_type="video/webm" res="1080p" fps="30fps" vcodec="vp9">,
125
  <Stream: itag="136" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.4d401f">,
@@ -146,7 +144,7 @@ You can also download a complete Youtube playlist:
146
  ```python
147
  >>> from pytube import Playlist
148
  >>> playlist = Playlist("https://www.youtube.com/playlist?list=PLynhp4cZEpTbRs_PYISQ8v_uwO0_mDg_X")
149
- >>> for video in playlist.videos:
150
  >>> video.streams.get_highest_resolution().download()
151
  ```
152
  This will download the highest progressive stream available (generally 720p) from the given playlist.
@@ -158,7 +156,7 @@ Pytube allows you to filter on every property available (see the documentation f
158
  To list the audio only streams:
159
 
160
  ```python
161
- >>> yt.streams.filter(only_audio=True).all()
162
  [<Stream: itag="140" mime_type="audio/mp4" abr="128kbps" acodec="mp4a.40.2">,
163
  <Stream: itag="171" mime_type="audio/webm" abr="128kbps" acodec="vorbis">,
164
  <Stream: itag="249" mime_type="audio/webm" abr="50kbps" acodec="opus">,
@@ -169,7 +167,7 @@ To list the audio only streams:
169
  To list only ``mp4`` streams:
170
 
171
  ```python
172
- >>> yt.streams.filter(subtype='mp4').all()
173
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
174
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
175
  <Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" vcodec="avc1.640028">,
@@ -184,9 +182,9 @@ To list only ``mp4`` streams:
184
  Multiple filters can also be specified:
185
 
186
  ```python
187
- >>> yt.streams.filter(subtype='mp4', progressive=True).all()
188
  >>> # this can also be expressed as:
189
- >>> yt.streams.filter(subtype='mp4').filter(progressive=True).all()
190
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
191
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">]
192
  ```
@@ -200,7 +198,7 @@ You also have an interface to select streams by their itag, without needing to f
200
  If you need to optimize for a specific feature, such as the "highest resolution" or "lowest average bitrate":
201
 
202
  ```python
203
- >>> yt.streams.filter(progressive=True).order_by('resolution').desc().all()
204
  ```
205
  Note: Using ``order_by`` on a given attribute will filter out all streams missing that attribute.
206
 
 
36
  ## Quick start
37
  ```python
38
  >>> from pytube import YouTube
39
+ >>> YouTube('https://youtu.be/9bZkp7q19f0').streams[0].download()
40
  >>>
41
  >>> yt = YouTube('http://youtube.com/watch?v=9bZkp7q19f0')
42
  >>> yt.streams
43
  ... .filter(progressive=True, file_extension='mp4')
44
+ ... .order_by('resolution')[-1]
 
 
45
  ... .download()
46
  ```
47
 
 
62
 
63
  ```python
64
  >>> from pytube import YouTube
65
+ >>> YouTube('http://youtube.com/watch?v=9bZkp7q19f0').streams[0].download()
66
  ```
67
  This example will download the highest quality progressive download stream available.
68
 
 
70
 
71
  ```python
72
  >>> yt = YouTube('http://youtube.com/watch?v=9bZkp7q19f0')
73
+ >>> print(yt.streams)
74
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
75
  <Stream: itag="43" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp8.0" acodec="vorbis">,
76
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
 
106
  To only view these progressive download streams:
107
 
108
  ```python
109
+ >>> yt.streams.filter(progressive=True)
110
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
111
  <Stream: itag="43" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp8.0" acodec="vorbis">,
112
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
 
117
  Conversely, if you only want to see the DASH streams (also referred to as "adaptive") you can do:
118
 
119
  ```python
120
+ >>> yt.streams.filter(adaptive=True)
121
  [<Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" vcodec="avc1.640028">,
122
  <Stream: itag="248" mime_type="video/webm" res="1080p" fps="30fps" vcodec="vp9">,
123
  <Stream: itag="136" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.4d401f">,
 
144
  ```python
145
  >>> from pytube import Playlist
146
  >>> playlist = Playlist("https://www.youtube.com/playlist?list=PLynhp4cZEpTbRs_PYISQ8v_uwO0_mDg_X")
147
+ >>> for video in playlist:
148
  >>> video.streams.get_highest_resolution().download()
149
  ```
150
  This will download the highest progressive stream available (generally 720p) from the given playlist.
 
156
  To list the audio only streams:
157
 
158
  ```python
159
+ >>> yt.streams.filter(only_audio=True)
160
  [<Stream: itag="140" mime_type="audio/mp4" abr="128kbps" acodec="mp4a.40.2">,
161
  <Stream: itag="171" mime_type="audio/webm" abr="128kbps" acodec="vorbis">,
162
  <Stream: itag="249" mime_type="audio/webm" abr="50kbps" acodec="opus">,
 
167
  To list only ``mp4`` streams:
168
 
169
  ```python
170
+ >>> yt.streams.filter(subtype='mp4')
171
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
172
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">,
173
  <Stream: itag="137" mime_type="video/mp4" res="1080p" fps="30fps" vcodec="avc1.640028">,
 
182
  Multiple filters can also be specified:
183
 
184
  ```python
185
+ >>> yt.streams.filter(subtype='mp4', progressive=True)
186
  >>> # this can also be expressed as:
187
+ >>> yt.streams.filter(subtype='mp4').filter(progressive=True)
188
  [<Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2">,
189
  <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2">]
190
  ```
 
198
  If you need to optimize for a specific feature, such as the "highest resolution" or "lowest average bitrate":
199
 
200
  ```python
201
+ >>> yt.streams.filter(progressive=True).order_by('resolution').desc()
202
  ```
203
  Note: Using ``order_by`` on a given attribute will filter out all streams missing that attribute.
204
 
pytube/__main__.py CHANGED
@@ -22,7 +22,7 @@ from pytube import Stream
22
  from pytube import StreamQuery
23
  from pytube.extract import apply_descrambler, apply_signature, get_ytplayer_config
24
  from pytube.helpers import install_proxy
25
- from pytube.exceptions import VideoUnavailable
26
  from pytube.monostate import OnProgress, OnComplete, Monostate
27
 
28
  logger = logging.getLogger(__name__)
@@ -131,17 +131,13 @@ class YouTube:
131
  apply_descrambler(self.vid_info, fmt)
132
  apply_descrambler(self.player_config_args, fmt)
133
 
134
- try:
135
- apply_signature(
136
- self.player_config_args, fmt, self.js # type: ignore
137
- )
138
- except TypeError:
139
  if not self.embed_html:
140
  self.embed_html = request.get(url=self.embed_url)
141
  self.js_url = extract.js_url(self.embed_html)
142
  self.js = request.get(self.js_url)
143
- assert self.js is not None
144
- apply_signature(self.player_config_args, fmt, self.js)
145
 
146
  # build instances of :class:`Stream <Stream>`
147
  self.initialize_stream_objects(fmt)
@@ -162,17 +158,17 @@ class YouTube:
162
  which blocks for long periods of time.
163
 
164
  :rtype: None
165
-
166
  """
167
  self.watch_html = request.get(url=self.watch_url)
168
  if self.watch_html is None:
169
  raise VideoUnavailable(video_id=self.video_id)
170
  self.age_restricted = extract.is_age_restricted(self.watch_html)
171
- if not self.age_restricted and (
172
- "yt-badge-live" in self.watch_html
173
- or "This video is private" in self.watch_html
174
- ):
175
- raise VideoUnavailable(video_id=self.video_id)
 
176
 
177
  if self.age_restricted:
178
  if not self.embed_html:
 
22
  from pytube import StreamQuery
23
  from pytube.extract import apply_descrambler, apply_signature, get_ytplayer_config
24
  from pytube.helpers import install_proxy
25
+ from pytube.exceptions import VideoUnavailable, LiveStreamError
26
  from pytube.monostate import OnProgress, OnComplete, Monostate
27
 
28
  logger = logging.getLogger(__name__)
 
131
  apply_descrambler(self.vid_info, fmt)
132
  apply_descrambler(self.player_config_args, fmt)
133
 
134
+ if not self.js:
 
 
 
 
135
  if not self.embed_html:
136
  self.embed_html = request.get(url=self.embed_url)
137
  self.js_url = extract.js_url(self.embed_html)
138
  self.js = request.get(self.js_url)
139
+
140
+ apply_signature(self.player_config_args, fmt, self.js)
141
 
142
  # build instances of :class:`Stream <Stream>`
143
  self.initialize_stream_objects(fmt)
 
158
  which blocks for long periods of time.
159
 
160
  :rtype: None
 
161
  """
162
  self.watch_html = request.get(url=self.watch_url)
163
  if self.watch_html is None:
164
  raise VideoUnavailable(video_id=self.video_id)
165
  self.age_restricted = extract.is_age_restricted(self.watch_html)
166
+
167
+ if not self.age_restricted:
168
+ if "yt-badge-live" in self.watch_html:
169
+ raise LiveStreamError(self.video_id)
170
+ if "This video is private" in self.watch_html:
171
+ raise VideoUnavailable(video_id=self.video_id)
172
 
173
  if self.age_restricted:
174
  if not self.embed_html:
pytube/cli.py CHANGED
@@ -1,4 +1,5 @@
1
  #!/usr/bin/env python3
 
2
  """A simple command line application to download youtube videos."""
3
 
4
  import argparse
@@ -412,12 +413,12 @@ def display_streams(youtube: YouTube) -> None:
412
  A valid YouTube watch URL.
413
 
414
  """
415
- for stream in youtube.streams.all():
416
  print(stream)
417
 
418
 
419
  def _print_available_captions(captions: CaptionQuery) -> None:
420
- print(f"Available caption codes are: {', '.join(c.code for c in captions.all())}")
421
 
422
 
423
  def download_caption(
 
1
  #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
  """A simple command line application to download youtube videos."""
4
 
5
  import argparse
 
413
  A valid YouTube watch URL.
414
 
415
  """
416
+ for stream in youtube.streams:
417
  print(stream)
418
 
419
 
420
  def _print_available_captions(captions: CaptionQuery) -> None:
421
+ print(f"Available caption codes are: {', '.join(c.code for c in captions)}")
422
 
423
 
424
  def download_caption(
pytube/contrib/playlist.py CHANGED
@@ -6,8 +6,9 @@ import json
6
  import logging
7
  import re
8
  from datetime import date, datetime
9
- from typing import List, Optional, Iterable, Dict
10
  from urllib.parse import parse_qs
 
11
 
12
  from pytube import request, YouTube
13
  from pytube.helpers import cache, deprecated, install_proxy, uniqueify
@@ -15,7 +16,7 @@ from pytube.helpers import cache, deprecated, install_proxy, uniqueify
15
  logger = logging.getLogger(__name__)
16
 
17
 
18
- class Playlist:
19
  """Load a YouTube playlist with URL or ID"""
20
 
21
  def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
@@ -142,10 +143,19 @@ class Playlist:
142
  """
143
  yield from (YouTube(url) for url in self.video_urls)
144
 
 
 
 
 
 
 
 
 
 
145
  @deprecated(
146
  "This call is unnecessary, you can directly access .video_urls or .videos"
147
  )
148
- def populate_video_urls(self) -> List[str]:
149
  """Complete links of all the videos in playlist
150
 
151
  :rtype: List[str]
 
6
  import logging
7
  import re
8
  from datetime import date, datetime
9
+ from typing import List, Optional, Iterable, Dict, Union
10
  from urllib.parse import parse_qs
11
+ from collections.abc import Sequence
12
 
13
  from pytube import request, YouTube
14
  from pytube.helpers import cache, deprecated, install_proxy, uniqueify
 
16
  logger = logging.getLogger(__name__)
17
 
18
 
19
+ class Playlist(Sequence):
20
  """Load a YouTube playlist with URL or ID"""
21
 
22
  def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
 
143
  """
144
  yield from (YouTube(url) for url in self.video_urls)
145
 
146
+ def __getitem__(self, i: Union[slice, int]) -> Union[str, List[str]]:
147
+ return self.video_urls[i]
148
+
149
+ def __len__(self) -> int:
150
+ return len(self.video_urls)
151
+
152
+ def __repr__(self) -> str:
153
+ return f"{self.video_urls}"
154
+
155
  @deprecated(
156
  "This call is unnecessary, you can directly access .video_urls or .videos"
157
  )
158
+ def populate_video_urls(self) -> List[str]: # pragma: no cover
159
  """Complete links of all the videos in playlist
160
 
161
  :rtype: List[str]
pytube/exceptions.py CHANGED
@@ -35,6 +35,15 @@ class RegexMatchError(ExtractError):
35
  class LiveStreamError(ExtractError):
36
  """Video is a live stream."""
37
 
 
 
 
 
 
 
 
 
 
38
 
39
  class VideoUnavailable(PytubeError):
40
  """Video is unavailable."""
 
35
  class LiveStreamError(ExtractError):
36
  """Video is a live stream."""
37
 
38
+ def __init__(self, video_id: str):
39
+ """
40
+ :param str video_id:
41
+ A YouTube video identifier.
42
+ """
43
+ super().__init__(f"{video_id} is streaming live and cannot be loaded")
44
+
45
+ self.video_id = video_id
46
+
47
 
48
  class VideoUnavailable(PytubeError):
49
  """Video is unavailable."""
pytube/extract.py CHANGED
@@ -231,7 +231,7 @@ def apply_signature(config_args: Dict, fmt: str, js: str) -> None:
231
  url: str = stream["url"]
232
  except KeyError:
233
  if live_stream:
234
- raise LiveStreamError("Video is currently being streamed live")
235
  # 403 Forbidden fix.
236
  if "signature" in url or (
237
  "s" not in stream and ("&sig=" in url or "&lsig=" in url)
@@ -242,12 +242,7 @@ def apply_signature(config_args: Dict, fmt: str, js: str) -> None:
242
  logger.debug("signature found, skip decipher")
243
  continue
244
 
245
- if js is not None:
246
- signature = cipher.get_signature(ciphered_signature=stream["s"])
247
- else:
248
- # signature not present in url (line 33), need js to descramble
249
- # TypeError caught in __main__
250
- raise TypeError("JS is None")
251
 
252
  logger.debug("finished descrambling signature for itag=%s", stream["itag"])
253
  # 403 forbidden fix
 
231
  url: str = stream["url"]
232
  except KeyError:
233
  if live_stream:
234
+ raise LiveStreamError("UNKNOWN")
235
  # 403 Forbidden fix.
236
  if "signature" in url or (
237
  "s" not in stream and ("&sig=" in url or "&lsig=" in url)
 
242
  logger.debug("signature found, skip decipher")
243
  continue
244
 
245
+ signature = cipher.get_signature(ciphered_signature=stream["s"])
 
 
 
 
 
246
 
247
  logger.debug("finished descrambling signature for itag=%s", stream["itag"])
248
  # 403 forbidden fix
pytube/query.py CHANGED
@@ -1,12 +1,14 @@
1
  # -*- coding: utf-8 -*-
2
 
3
  """This module provides a query interface for media streams and captions."""
4
- from typing import List, Optional, Callable
 
5
 
6
  from pytube import Stream, Caption
 
7
 
8
 
9
- class StreamQuery:
10
  """Interface for querying the available media streams."""
11
 
12
  def __init__(self, fmt_streams):
@@ -324,15 +326,19 @@ class StreamQuery:
324
  except IndexError:
325
  pass
326
 
327
- def count(self) -> int:
328
- """Get the count the query would return.
 
329
 
330
  :rtype: int
331
-
332
  """
333
- return len(self.fmt_streams)
 
 
 
334
 
335
- def all(self) -> List[Stream]:
 
336
  """Get all the results represented by this query as a list.
337
 
338
  :rtype: list
@@ -340,8 +346,17 @@ class StreamQuery:
340
  """
341
  return self.fmt_streams
342
 
 
 
 
 
 
 
 
 
343
 
344
- class CaptionQuery:
 
345
  """Interface for querying the available captions."""
346
 
347
  def __init__(self, captions: List[Caption]):
@@ -351,9 +366,9 @@ class CaptionQuery:
351
  list of :class:`Caption <Caption>` instances.
352
 
353
  """
354
- self.captions = captions
355
  self.lang_code_index = {c.code: c for c in captions}
356
 
 
357
  def get_by_language_code(self, lang_code: str) -> Optional[Caption]:
358
  """Get the :class:`Caption <Caption>` for a given ``lang_code``.
359
 
@@ -366,10 +381,23 @@ class CaptionQuery:
366
  """
367
  return self.lang_code_index.get(lang_code)
368
 
369
- def all(self) -> List[Caption]:
 
370
  """Get all the results represented by this query as a list.
371
 
372
  :rtype: list
373
 
374
  """
375
- return self.captions
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # -*- coding: utf-8 -*-
2
 
3
  """This module provides a query interface for media streams and captions."""
4
+ from typing import Callable, List, Optional, Union
5
+ from collections.abc import Mapping, Sequence
6
 
7
  from pytube import Stream, Caption
8
+ from pytube.helpers import deprecated
9
 
10
 
11
+ class StreamQuery(Sequence):
12
  """Interface for querying the available media streams."""
13
 
14
  def __init__(self, fmt_streams):
 
326
  except IndexError:
327
  pass
328
 
329
+ @deprecated("Get the size of this list directly using len()")
330
+ def count(self, value: Optional[str] = None) -> int: # pragma: no cover
331
+ """Get the count of items in the list.
332
 
333
  :rtype: int
 
334
  """
335
+ if value:
336
+ return self.fmt_streams.count(value)
337
+
338
+ return len(self)
339
 
340
+ @deprecated("This object can be treated as a list, all() is useless")
341
+ def all(self) -> List[Stream]: # pragma: no cover
342
  """Get all the results represented by this query as a list.
343
 
344
  :rtype: list
 
346
  """
347
  return self.fmt_streams
348
 
349
+ def __getitem__(self, i: Union[slice, int]):
350
+ return self.fmt_streams[i]
351
+
352
+ def __len__(self) -> int:
353
+ return len(self.fmt_streams)
354
+
355
+ def __repr__(self) -> str:
356
+ return f"{self.fmt_streams}"
357
 
358
+
359
+ class CaptionQuery(Mapping):
360
  """Interface for querying the available captions."""
361
 
362
  def __init__(self, captions: List[Caption]):
 
366
  list of :class:`Caption <Caption>` instances.
367
 
368
  """
 
369
  self.lang_code_index = {c.code: c for c in captions}
370
 
371
+ @deprecated("This object can be treated as a dictionary, i.e. captions['en']")
372
  def get_by_language_code(self, lang_code: str) -> Optional[Caption]:
373
  """Get the :class:`Caption <Caption>` for a given ``lang_code``.
374
 
 
381
  """
382
  return self.lang_code_index.get(lang_code)
383
 
384
+ @deprecated("This object can be treated as a dictionary")
385
+ def all(self) -> List[Caption]: # pragma: no cover
386
  """Get all the results represented by this query as a list.
387
 
388
  :rtype: list
389
 
390
  """
391
+ return list(self.lang_code_index.values())
392
+
393
+ def __getitem__(self, i: str):
394
+ return self.lang_code_index[i]
395
+
396
+ def __len__(self) -> int:
397
+ return len(self.lang_code_index)
398
+
399
+ def __iter__(self):
400
+ return iter(self.lang_code_index)
401
+
402
+ def __repr__(self) -> str:
403
+ return f"{self.lang_code_index}"
tests/contrib/test_playlist.py CHANGED
@@ -81,6 +81,39 @@ def test_video_urls(request_get, playlist_html):
81
  ]
82
 
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  @mock.patch("pytube.contrib.playlist.request.get")
85
  @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
86
  def test_videos(youtube, request_get, playlist_html):
 
81
  ]
82
 
83
 
84
+ @mock.patch("pytube.contrib.playlist.request.get")
85
+ def test_repr(request_get, playlist_html):
86
+ url = "https://www.fakeurl.com/playlist?list=whatever"
87
+ request_get.return_value = playlist_html
88
+ playlist = Playlist(url)
89
+ playlist._find_load_more_url = MagicMock(return_value=None)
90
+ request_get.assert_called()
91
+ assert (
92
+ repr(playlist) == "['https://www.youtube.com/watch?v=ujTCoH21GlA', "
93
+ "'https://www.youtube.com/watch?v=45ryDIPHdGg', "
94
+ "'https://www.youtube.com/watch?v=1BYu65vLKdA', "
95
+ "'https://www.youtube.com/watch?v=3AQ_74xrch8', "
96
+ "'https://www.youtube.com/watch?v=ddqQUz9mZaM', "
97
+ "'https://www.youtube.com/watch?v=vwLT6bZrHEE', "
98
+ "'https://www.youtube.com/watch?v=TQKI0KE-JYY', "
99
+ "'https://www.youtube.com/watch?v=dNBvQ38MlT8', "
100
+ "'https://www.youtube.com/watch?v=JHxyrMgOUWI', "
101
+ "'https://www.youtube.com/watch?v=l2I8NycJMCY', "
102
+ "'https://www.youtube.com/watch?v=g1Zbuk1gAfk', "
103
+ "'https://www.youtube.com/watch?v=zixd-si9Q-o']"
104
+ )
105
+
106
+
107
+ @mock.patch("pytube.contrib.playlist.request.get")
108
+ def test_sequence(request_get, playlist_html):
109
+ url = "https://www.fakeurl.com/playlist?list=whatever"
110
+ request_get.return_value = playlist_html
111
+ playlist = Playlist(url)
112
+ playlist._find_load_more_url = MagicMock(return_value=None)
113
+ assert playlist[0] == "https://www.youtube.com/watch?v=ujTCoH21GlA"
114
+ assert len(playlist) == 12
115
+
116
+
117
  @mock.patch("pytube.contrib.playlist.request.get")
118
  @mock.patch("pytube.cli.YouTube.__init__", return_value=None)
119
  def test_videos(youtube, request_get, playlist_html):
tests/test_captions.py CHANGED
@@ -2,6 +2,8 @@
2
  from unittest import mock
3
  from unittest.mock import patch, mock_open, MagicMock
4
 
 
 
5
  from pytube import Caption, CaptionQuery, captions
6
 
7
 
@@ -12,7 +14,7 @@ def test_float_to_srt_time_format():
12
  assert caption1.float_to_srt_time_format(3.89) == "00:00:03,890"
13
 
14
 
15
- def test_caption_query_all():
16
  caption1 = Caption(
17
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
18
  )
@@ -20,7 +22,11 @@ def test_caption_query_all():
20
  {"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
21
  )
22
  caption_query = CaptionQuery(captions=[caption1, caption2])
23
- assert caption_query.captions == [caption1, caption2]
 
 
 
 
24
 
25
 
26
  def test_caption_query_get_by_language_code_when_exists():
@@ -101,6 +107,9 @@ def test_repr():
101
  )
102
  assert str(caption) == '<Caption lang="name1" code="en">'
103
 
 
 
 
104
 
105
  @mock.patch("pytube.request.get")
106
  def test_xml_captions(request_get):
 
2
  from unittest import mock
3
  from unittest.mock import patch, mock_open, MagicMock
4
 
5
+ import pytest
6
+
7
  from pytube import Caption, CaptionQuery, captions
8
 
9
 
 
14
  assert caption1.float_to_srt_time_format(3.89) == "00:00:03,890"
15
 
16
 
17
+ def test_caption_query_sequence():
18
  caption1 = Caption(
19
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
20
  )
 
22
  {"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"}
23
  )
24
  caption_query = CaptionQuery(captions=[caption1, caption2])
25
+ assert len(caption_query) == 2
26
+ assert caption_query["en"] == caption1
27
+ assert caption_query["fr"] == caption2
28
+ with pytest.raises(KeyError):
29
+ caption_query["nada"]
30
 
31
 
32
  def test_caption_query_get_by_language_code_when_exists():
 
107
  )
108
  assert str(caption) == '<Caption lang="name1" code="en">'
109
 
110
+ caption_query = CaptionQuery(captions=[caption])
111
+ assert repr(caption_query) == '{\'en\': <Caption lang="name1" code="en">}'
112
+
113
 
114
  @mock.patch("pytube.request.get")
115
  def test_xml_captions(request_get):
tests/test_cli.py CHANGED
@@ -10,14 +10,18 @@ from pytube import cli, StreamQuery, Caption, CaptionQuery
10
  parse_args = cli._parse_args
11
 
12
 
 
13
  @mock.patch("pytube.cli.YouTube")
14
- def test_download_when_itag_not_found(youtube):
 
15
  youtube.streams = mock.Mock()
16
- youtube.streams.all.return_value = []
17
  youtube.streams.get_by_itag.return_value = None
 
18
  with pytest.raises(SystemExit):
19
  cli.download_by_itag(youtube, 123)
 
20
  youtube.streams.get_by_itag.assert_called_with(123)
 
21
 
22
 
23
  @mock.patch("pytube.cli.YouTube")
@@ -37,26 +41,28 @@ def test_download_when_itag_is_found(youtube, stream):
37
  @mock.patch("pytube.cli.YouTube")
38
  @mock.patch("pytube.Stream")
39
  def test_display_stream(youtube, stream):
 
40
  stream.itag = 123
41
  stream.__repr__ = MagicMock(return_value="")
42
  youtube.streams = StreamQuery([stream])
43
- with patch.object(youtube.streams, "all", wraps=youtube.streams.all) as wrapped_all:
44
- cli.display_streams(youtube)
45
- wrapped_all.assert_called()
46
- stream.__repr__.assert_called()
47
 
48
 
 
49
  @mock.patch("pytube.cli.YouTube")
50
- def test_download_caption_with_none(youtube):
 
51
  caption = Caption(
52
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
53
  )
54
  youtube.captions = CaptionQuery([caption])
55
- with patch.object(
56
- youtube.captions, "all", wraps=youtube.captions.all
57
- ) as wrapped_all:
58
- cli.download_caption(youtube, None)
59
- wrapped_all.assert_called()
60
 
61
 
62
  @mock.patch("pytube.cli.YouTube")
@@ -71,17 +77,18 @@ def test_download_caption_with_language_found(youtube):
71
  caption.download.assert_called_with(title="video title", output_path=None)
72
 
73
 
 
74
  @mock.patch("pytube.cli.YouTube")
75
- def test_download_caption_with_language_not_found(youtube):
 
76
  caption = Caption(
77
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
78
  )
79
  youtube.captions = CaptionQuery([caption])
80
- with patch.object(
81
- youtube.captions, "all", wraps=youtube.captions.all
82
- ) as wrapped_all:
83
- cli.download_caption(youtube, "blah")
84
- wrapped_all.assert_called()
85
 
86
 
87
  def test_display_progress_bar(capsys):
 
10
  parse_args = cli._parse_args
11
 
12
 
13
+ @mock.patch("pytube.cli.display_streams")
14
  @mock.patch("pytube.cli.YouTube")
15
+ def test_download_when_itag_not_found(youtube, display_streams):
16
+ # Given
17
  youtube.streams = mock.Mock()
 
18
  youtube.streams.get_by_itag.return_value = None
19
+ # When
20
  with pytest.raises(SystemExit):
21
  cli.download_by_itag(youtube, 123)
22
+ # Then
23
  youtube.streams.get_by_itag.assert_called_with(123)
24
+ display_streams.assert_called_with(youtube)
25
 
26
 
27
  @mock.patch("pytube.cli.YouTube")
 
41
  @mock.patch("pytube.cli.YouTube")
42
  @mock.patch("pytube.Stream")
43
  def test_display_stream(youtube, stream):
44
+ # Given
45
  stream.itag = 123
46
  stream.__repr__ = MagicMock(return_value="")
47
  youtube.streams = StreamQuery([stream])
48
+ # When
49
+ cli.display_streams(youtube)
50
+ # Then
51
+ stream.__repr__.assert_called()
52
 
53
 
54
+ @mock.patch("pytube.cli._print_available_captions")
55
  @mock.patch("pytube.cli.YouTube")
56
+ def test_download_caption_with_none(youtube, print_available):
57
+ # Given
58
  caption = Caption(
59
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
60
  )
61
  youtube.captions = CaptionQuery([caption])
62
+ # When
63
+ cli.download_caption(youtube, None)
64
+ # Then
65
+ print_available.assert_called_with(youtube.captions)
 
66
 
67
 
68
  @mock.patch("pytube.cli.YouTube")
 
77
  caption.download.assert_called_with(title="video title", output_path=None)
78
 
79
 
80
+ @mock.patch("pytube.cli._print_available_captions")
81
  @mock.patch("pytube.cli.YouTube")
82
+ def test_download_caption_with_lang_not_found(youtube, print_available):
83
+ # Given
84
  caption = Caption(
85
  {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"}
86
  )
87
  youtube.captions = CaptionQuery([caption])
88
+ # When
89
+ cli.download_caption(youtube, "blah")
90
+ # Then
91
+ print_available.assert_called_with(youtube.captions)
 
92
 
93
 
94
  def test_display_progress_bar(capsys):
tests/test_exceptions.py CHANGED
@@ -1,5 +1,5 @@
1
  # -*- coding: utf-8 -*-
2
- from pytube.exceptions import VideoUnavailable, RegexMatchError
3
 
4
 
5
  def test_video_unavailable():
@@ -15,3 +15,11 @@ def test_regex_match_error():
15
  raise RegexMatchError(caller="hello", pattern="*")
16
  except RegexMatchError as e:
17
  assert str(e) == "hello: could not find match for *"
 
 
 
 
 
 
 
 
 
1
  # -*- coding: utf-8 -*-
2
+ from pytube.exceptions import VideoUnavailable, RegexMatchError, LiveStreamError
3
 
4
 
5
  def test_video_unavailable():
 
15
  raise RegexMatchError(caller="hello", pattern="*")
16
  except RegexMatchError as e:
17
  assert str(e) == "hello: could not find match for *"
18
+
19
+
20
+ def test_live_stream_error():
21
+ try:
22
+ raise LiveStreamError(video_id="YLnZklYFe7E")
23
+ except LiveStreamError as e:
24
+ assert e.video_id == "YLnZklYFe7E"
25
+ assert str(e) == "YLnZklYFe7E is streaming live and cannot be loaded"
tests/test_extract.py CHANGED
@@ -77,3 +77,8 @@ def test_mime_type_codec():
77
  def test_mime_type_codec_with_no_match_should_error():
78
  with pytest.raises(RegexMatchError):
79
  extract.mime_type_codec("audio/webm")
 
 
 
 
 
 
77
  def test_mime_type_codec_with_no_match_should_error():
78
  with pytest.raises(RegexMatchError):
79
  extract.mime_type_codec("audio/webm")
80
+
81
+
82
+ def test_get_ytplayer_config_with_no_match_should_error():
83
+ with pytest.raises(RegexMatchError):
84
+ extract.get_ytplayer_config("")
tests/test_mixins.py DELETED
@@ -1,3 +0,0 @@
1
- # -*- coding: utf-8 -*-
2
- def test_pre_signed_video(presigned_video):
3
- assert presigned_video.streams.count() == 12
 
 
 
 
tests/test_query.py CHANGED
@@ -3,11 +3,6 @@
3
  import pytest
4
 
5
 
6
- def test_count(cipher_signature):
7
- """Ensure :meth:`~pytube.StreamQuery.count` returns an accurate amount."""
8
- assert cipher_signature.streams.count() == 22
9
-
10
-
11
  @pytest.mark.parametrize(
12
  ("test_input", "expected"),
13
  [
@@ -30,7 +25,7 @@ def test_count(cipher_signature):
30
  )
31
  def test_filters(test_input, expected, cipher_signature):
32
  """Ensure filters produce the expected results."""
33
- result = [s.itag for s in cipher_signature.streams.filter(**test_input).all()]
34
  assert result == expected
35
 
36
 
@@ -64,8 +59,7 @@ def test_order_by(cipher_signature):
64
  :class:`Stream <Stream>` instances in the expected order.
65
  """
66
  itags = [
67
- s.itag
68
- for s in cipher_signature.streams.filter(type="audio").order_by("itag").all()
69
  ]
70
  assert itags == [140, 249, 250, 251]
71
 
@@ -77,10 +71,7 @@ def test_order_by_descending(cipher_signature):
77
  # numerical values
78
  itags = [
79
  s.itag
80
- for s in cipher_signature.streams.filter(type="audio")
81
- .order_by("itag")
82
- .desc()
83
- .all()
84
  ]
85
  assert itags == [251, 250, 249, 140]
86
 
@@ -91,7 +82,6 @@ def test_order_by_non_numerical(cipher_signature):
91
  for s in cipher_signature.streams.filter(res="360p")
92
  .order_by("mime_type")
93
  .desc()
94
- .all()
95
  ]
96
  assert mime_types == ["video/webm", "video/mp4", "video/mp4"]
97
 
@@ -103,10 +93,7 @@ def test_order_by_ascending(cipher_signature):
103
  # numerical values
104
  itags = [
105
  s.itag
106
- for s in cipher_signature.streams.filter(type="audio")
107
- .order_by("itag")
108
- .asc()
109
- .all()
110
  ]
111
  assert itags == [140, 249, 250, 251]
112
 
@@ -114,16 +101,13 @@ def test_order_by_ascending(cipher_signature):
114
  def test_order_by_non_numerical_ascending(cipher_signature):
115
  mime_types = [
116
  s.mime_type
117
- for s in cipher_signature.streams.filter(res="360p")
118
- .order_by("mime_type")
119
- .asc()
120
- .all()
121
  ]
122
  assert mime_types == ["video/mp4", "video/mp4", "video/webm"]
123
 
124
 
125
  def test_order_by_with_none_values(cipher_signature):
126
- abrs = [s.abr for s in cipher_signature.streams.order_by("abr").asc().all()]
127
  assert abrs == ["50kbps", "70kbps", "96kbps", "128kbps", "160kbps"]
128
 
129
 
@@ -151,7 +135,7 @@ def test_get_highest_resolution(cipher_signature):
151
 
152
 
153
  def test_filter_is_dash(cipher_signature):
154
- streams = cipher_signature.streams.filter(is_dash=False).all()
155
  itags = [s.itag for s in streams]
156
  assert itags == [18, 398, 397, 396, 395, 394]
157
 
@@ -164,9 +148,26 @@ def test_get_audio_only_with_subtype(cipher_signature):
164
  assert cipher_signature.streams.get_audio_only(subtype="webm").itag == 251
165
 
166
 
 
 
 
 
 
167
  def test_otf(cipher_signature):
168
  non_otf = cipher_signature.streams.otf().all()
169
  assert len(non_otf) == 22
170
 
171
  otf = cipher_signature.streams.otf(True).all()
172
  assert len(otf) == 0
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import pytest
4
 
5
 
 
 
 
 
 
6
  @pytest.mark.parametrize(
7
  ("test_input", "expected"),
8
  [
 
25
  )
26
  def test_filters(test_input, expected, cipher_signature):
27
  """Ensure filters produce the expected results."""
28
+ result = [s.itag for s in cipher_signature.streams.filter(**test_input)]
29
  assert result == expected
30
 
31
 
 
59
  :class:`Stream <Stream>` instances in the expected order.
60
  """
61
  itags = [
62
+ s.itag for s in cipher_signature.streams.filter(type="audio").order_by("itag")
 
63
  ]
64
  assert itags == [140, 249, 250, 251]
65
 
 
71
  # numerical values
72
  itags = [
73
  s.itag
74
+ for s in cipher_signature.streams.filter(type="audio").order_by("itag").desc()
 
 
 
75
  ]
76
  assert itags == [251, 250, 249, 140]
77
 
 
82
  for s in cipher_signature.streams.filter(res="360p")
83
  .order_by("mime_type")
84
  .desc()
 
85
  ]
86
  assert mime_types == ["video/webm", "video/mp4", "video/mp4"]
87
 
 
93
  # numerical values
94
  itags = [
95
  s.itag
96
+ for s in cipher_signature.streams.filter(type="audio").order_by("itag").asc()
 
 
 
97
  ]
98
  assert itags == [140, 249, 250, 251]
99
 
 
101
  def test_order_by_non_numerical_ascending(cipher_signature):
102
  mime_types = [
103
  s.mime_type
104
+ for s in cipher_signature.streams.filter(res="360p").order_by("mime_type").asc()
 
 
 
105
  ]
106
  assert mime_types == ["video/mp4", "video/mp4", "video/webm"]
107
 
108
 
109
  def test_order_by_with_none_values(cipher_signature):
110
+ abrs = [s.abr for s in cipher_signature.streams.order_by("abr").asc()]
111
  assert abrs == ["50kbps", "70kbps", "96kbps", "128kbps", "160kbps"]
112
 
113
 
 
135
 
136
 
137
  def test_filter_is_dash(cipher_signature):
138
+ streams = cipher_signature.streams.filter(is_dash=False)
139
  itags = [s.itag for s in streams]
140
  assert itags == [18, 398, 397, 396, 395, 394]
141
 
 
148
  assert cipher_signature.streams.get_audio_only(subtype="webm").itag == 251
149
 
150
 
151
+ def test_sequence(cipher_signature):
152
+ assert len(cipher_signature.streams) == 22
153
+ assert cipher_signature.streams[0] is not None
154
+
155
+
156
  def test_otf(cipher_signature):
157
  non_otf = cipher_signature.streams.otf().all()
158
  assert len(non_otf) == 22
159
 
160
  otf = cipher_signature.streams.otf(True).all()
161
  assert len(otf) == 0
162
+
163
+
164
+ def test_repr(cipher_signature):
165
+ assert repr(
166
+ cipher_signature.streams.filter(
167
+ progressive=True, subtype="mp4", resolution="360p"
168
+ )
169
+ ) == (
170
+ '[<Stream: itag="18" mime_type="video/mp4" '
171
+ 'res="360p" fps="30fps" vcodec="avc1.42001E" '
172
+ 'acodec="mp4a.40.2" progressive="True" type="video">]'
173
+ )
tests/test_streams.py CHANGED
@@ -45,7 +45,7 @@ def test_caption_tracks(presigned_video):
45
 
46
 
47
  def test_captions(presigned_video):
48
- assert len(presigned_video.captions.all()) == 13
49
 
50
 
51
  def test_description(cipher_signature):
 
45
 
46
 
47
  def test_captions(presigned_video):
48
+ assert len(presigned_video.captions) == 13
49
 
50
 
51
  def test_description(cipher_signature):