Merge pull request #44 from hbmartin/request-head
Browse files- pytube/request.py +15 -9
- pytube/streams.py +1 -1
- tests/test_request.py +1 -1
- tests/test_streams.py +16 -16
pytube/request.py
CHANGED
@@ -1,17 +1,22 @@
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
|
3 |
"""Implements a simple wrapper around urlopen."""
|
4 |
-
from typing import Any, Iterable, Dict
|
5 |
from urllib.request import Request
|
6 |
from urllib.request import urlopen
|
7 |
|
8 |
|
9 |
-
def _execute_request(
|
10 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
11 |
raise ValueError
|
12 |
-
return urlopen(
|
13 |
-
Request(url, headers={"User-Agent": "Mozilla/5.0", "Range": "bytes=0-"})
|
14 |
-
) # nosec
|
15 |
|
16 |
|
17 |
def get(url) -> str:
|
@@ -32,7 +37,7 @@ def stream(url: str, chunk_size: int = 8192) -> Iterable[bytes]:
|
|
32 |
:param int chunk_size: The size in bytes of each chunk. Defaults to 8*1024
|
33 |
:rtype: Iterable[bytes]
|
34 |
"""
|
35 |
-
response = _execute_request(url)
|
36 |
while True:
|
37 |
buf = response.read(chunk_size)
|
38 |
if not buf:
|
@@ -40,7 +45,7 @@ def stream(url: str, chunk_size: int = 8192) -> Iterable[bytes]:
|
|
40 |
yield buf
|
41 |
|
42 |
|
43 |
-
def
|
44 |
"""Fetch headers returned http GET request.
|
45 |
|
46 |
:param str url:
|
@@ -49,4 +54,5 @@ def headers(url: str) -> Dict:
|
|
49 |
:returns:
|
50 |
dictionary of lowercase headers
|
51 |
"""
|
52 |
-
|
|
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
|
3 |
"""Implements a simple wrapper around urlopen."""
|
4 |
+
from typing import Any, Iterable, Dict, Optional
|
5 |
from urllib.request import Request
|
6 |
from urllib.request import urlopen
|
7 |
|
8 |
|
9 |
+
def _execute_request(
|
10 |
+
url: str, method: Optional[str] = None, headers: Optional[Dict[str, str]] = None
|
11 |
+
) -> Any:
|
12 |
+
base_headers = {"User-Agent": "Mozilla/5.0"}
|
13 |
+
if headers:
|
14 |
+
base_headers.update(headers)
|
15 |
+
if url.lower().startswith("http"):
|
16 |
+
request = Request(url, headers=base_headers, method=method)
|
17 |
+
else:
|
18 |
raise ValueError
|
19 |
+
return urlopen(request) # nosec
|
|
|
|
|
20 |
|
21 |
|
22 |
def get(url) -> str:
|
|
|
37 |
:param int chunk_size: The size in bytes of each chunk. Defaults to 8*1024
|
38 |
:rtype: Iterable[bytes]
|
39 |
"""
|
40 |
+
response = _execute_request(url, headers={"Range": "bytes=0-"})
|
41 |
while True:
|
42 |
buf = response.read(chunk_size)
|
43 |
if not buf:
|
|
|
45 |
yield buf
|
46 |
|
47 |
|
48 |
+
def head(url: str) -> Dict:
|
49 |
"""Fetch headers returned http GET request.
|
50 |
|
51 |
:param str url:
|
|
|
54 |
:returns:
|
55 |
dictionary of lowercase headers
|
56 |
"""
|
57 |
+
response_headers = _execute_request(url, method="HEAD").info()
|
58 |
+
return {k.lower(): v for k, v in response_headers.items()}
|
pytube/streams.py
CHANGED
@@ -140,7 +140,7 @@ class Stream:
|
|
140 |
Filesize (in bytes) of the stream.
|
141 |
"""
|
142 |
if self._filesize is None:
|
143 |
-
headers = request.
|
144 |
self._filesize = int(headers["content-length"])
|
145 |
return self._filesize
|
146 |
|
|
|
140 |
Filesize (in bytes) of the stream.
|
141 |
"""
|
142 |
if self._filesize is None:
|
143 |
+
headers = request.head(self.url)
|
144 |
self._filesize = int(headers["content-length"])
|
145 |
return self._filesize
|
146 |
|
tests/test_request.py
CHANGED
@@ -30,7 +30,7 @@ def test_headers(mock_urlopen):
|
|
30 |
response = mock.Mock()
|
31 |
response.info.return_value = {"content-length": "16384"}
|
32 |
mock_urlopen.return_value = response
|
33 |
-
response = request.
|
34 |
assert response == {"content-length": "16384"}
|
35 |
|
36 |
|
|
|
30 |
response = mock.Mock()
|
31 |
response.info.return_value = {"content-length": "16384"}
|
32 |
mock_urlopen.return_value = response
|
33 |
+
response = request.head("http://fakeassurl.gov")
|
34 |
assert response == {"content-length": "16384"}
|
35 |
|
36 |
|
tests/test_streams.py
CHANGED
@@ -10,8 +10,8 @@ from pytube import Stream, streams
|
|
10 |
|
11 |
|
12 |
def test_filesize(cipher_signature, mocker):
|
13 |
-
mocker.patch.object(request, "
|
14 |
-
request.
|
15 |
assert cipher_signature.streams.first().filesize == 6796391
|
16 |
|
17 |
|
@@ -87,8 +87,8 @@ def test_views(cipher_signature):
|
|
87 |
|
88 |
|
89 |
def test_download(cipher_signature, mocker):
|
90 |
-
mocker.patch.object(request, "
|
91 |
-
request.
|
92 |
mocker.patch.object(request, "stream")
|
93 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
94 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
@@ -97,8 +97,8 @@ def test_download(cipher_signature, mocker):
|
|
97 |
|
98 |
|
99 |
def test_download_with_prefix(cipher_signature, mocker):
|
100 |
-
mocker.patch.object(request, "
|
101 |
-
request.
|
102 |
mocker.patch.object(request, "stream")
|
103 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
104 |
streams.target_directory = MagicMock(return_value="/target")
|
@@ -109,8 +109,8 @@ def test_download_with_prefix(cipher_signature, mocker):
|
|
109 |
|
110 |
|
111 |
def test_download_with_filename(cipher_signature, mocker):
|
112 |
-
mocker.patch.object(request, "
|
113 |
-
request.
|
114 |
mocker.patch.object(request, "stream")
|
115 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
116 |
streams.target_directory = MagicMock(return_value="/target")
|
@@ -121,8 +121,8 @@ def test_download_with_filename(cipher_signature, mocker):
|
|
121 |
|
122 |
|
123 |
def test_download_with_existing(cipher_signature, mocker):
|
124 |
-
mocker.patch.object(request, "
|
125 |
-
request.
|
126 |
mocker.patch.object(request, "stream")
|
127 |
streams.target_directory = MagicMock(return_value="/target")
|
128 |
mocker.patch.object(os.path, "isfile")
|
@@ -137,8 +137,8 @@ def test_download_with_existing(cipher_signature, mocker):
|
|
137 |
|
138 |
|
139 |
def test_download_with_existing_no_skip(cipher_signature, mocker):
|
140 |
-
mocker.patch.object(request, "
|
141 |
-
request.
|
142 |
mocker.patch.object(request, "stream")
|
143 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
144 |
streams.target_directory = MagicMock(return_value="/target")
|
@@ -167,8 +167,8 @@ def test_on_progress_hook(cipher_signature, mocker):
|
|
167 |
callback_fn = mock.MagicMock()
|
168 |
cipher_signature.register_on_progress_callback(callback_fn)
|
169 |
|
170 |
-
mocker.patch.object(request, "
|
171 |
-
request.
|
172 |
mocker.patch.object(request, "stream")
|
173 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
174 |
|
@@ -186,8 +186,8 @@ def test_on_complete_hook(cipher_signature, mocker):
|
|
186 |
callback_fn = mock.MagicMock()
|
187 |
cipher_signature.register_on_complete_callback(callback_fn)
|
188 |
|
189 |
-
mocker.patch.object(request, "
|
190 |
-
request.
|
191 |
mocker.patch.object(request, "stream")
|
192 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
193 |
|
|
|
10 |
|
11 |
|
12 |
def test_filesize(cipher_signature, mocker):
|
13 |
+
mocker.patch.object(request, "head")
|
14 |
+
request.head.return_value = {"content-length": "6796391"}
|
15 |
assert cipher_signature.streams.first().filesize == 6796391
|
16 |
|
17 |
|
|
|
87 |
|
88 |
|
89 |
def test_download(cipher_signature, mocker):
|
90 |
+
mocker.patch.object(request, "head")
|
91 |
+
request.head.return_value = {"content-length": "16384"}
|
92 |
mocker.patch.object(request, "stream")
|
93 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
94 |
with mock.patch("pytube.streams.open", mock.mock_open(), create=True):
|
|
|
97 |
|
98 |
|
99 |
def test_download_with_prefix(cipher_signature, mocker):
|
100 |
+
mocker.patch.object(request, "head")
|
101 |
+
request.head.return_value = {"content-length": "16384"}
|
102 |
mocker.patch.object(request, "stream")
|
103 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
104 |
streams.target_directory = MagicMock(return_value="/target")
|
|
|
109 |
|
110 |
|
111 |
def test_download_with_filename(cipher_signature, mocker):
|
112 |
+
mocker.patch.object(request, "head")
|
113 |
+
request.head.return_value = {"content-length": "16384"}
|
114 |
mocker.patch.object(request, "stream")
|
115 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
116 |
streams.target_directory = MagicMock(return_value="/target")
|
|
|
121 |
|
122 |
|
123 |
def test_download_with_existing(cipher_signature, mocker):
|
124 |
+
mocker.patch.object(request, "head")
|
125 |
+
request.head.return_value = {"content-length": "16384"}
|
126 |
mocker.patch.object(request, "stream")
|
127 |
streams.target_directory = MagicMock(return_value="/target")
|
128 |
mocker.patch.object(os.path, "isfile")
|
|
|
137 |
|
138 |
|
139 |
def test_download_with_existing_no_skip(cipher_signature, mocker):
|
140 |
+
mocker.patch.object(request, "head")
|
141 |
+
request.head.return_value = {"content-length": "16384"}
|
142 |
mocker.patch.object(request, "stream")
|
143 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
144 |
streams.target_directory = MagicMock(return_value="/target")
|
|
|
167 |
callback_fn = mock.MagicMock()
|
168 |
cipher_signature.register_on_progress_callback(callback_fn)
|
169 |
|
170 |
+
mocker.patch.object(request, "head")
|
171 |
+
request.head.return_value = {"content-length": "16384"}
|
172 |
mocker.patch.object(request, "stream")
|
173 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
174 |
|
|
|
186 |
callback_fn = mock.MagicMock()
|
187 |
cipher_signature.register_on_complete_callback(callback_fn)
|
188 |
|
189 |
+
mocker.patch.object(request, "head")
|
190 |
+
request.head.return_value = {"content-length": "16384"}
|
191 |
mocker.patch.object(request, "stream")
|
192 |
request.stream.return_value = iter([str(random.getrandbits(8 * 1024))])
|
193 |
|