"""
Images Pipeline
See documentation in topics/media-pipeline.rst
"""
from __future__ import annotations
import functools
import hashlib
import warnings
from contextlib import suppress
from io import BytesIO
from os import PathLike
from typing import TYPE_CHECKING, Dict, Tuple, Type, Union, cast
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem, NotConfigured, ScrapyDeprecationWarning
from scrapy.http import Request
from scrapy.http.request import NO_CALLBACK
from scrapy.pipelines.files import (
FileException,
FilesPipeline,
FTPFilesStore,
GCSFilesStore,
S3FilesStore,
_md5sum,
)
# TODO: from scrapy.pipelines.media import MediaPipeline
from scrapy.settings import Settings
from scrapy.utils.python import get_func_args, to_bytes
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
class NoimagesDrop(DropItem):
"""Product with no images exception"""
def __init__(self, *args, **kwargs):
warnings.warn(
"The NoimagesDrop class is deprecated",
category=ScrapyDeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
class ImageException(FileException):
"""General image error exception"""
[docs]class ImagesPipeline(FilesPipeline):
"""Abstract pipeline that implement the image thumbnail generation logic"""
MEDIA_NAME = "image"
# Uppercase attributes kept for backward compatibility with code that subclasses
# ImagesPipeline. They may be overridden by settings.
MIN_WIDTH = 0
MIN_HEIGHT = 0
EXPIRES = 90
THUMBS: Dict[str, Tuple[int, int]] = {}
DEFAULT_IMAGES_URLS_FIELD = "image_urls"
DEFAULT_IMAGES_RESULT_FIELD = "images"
def __init__(
self, store_uri: Union[str, PathLike], download_func=None, settings=None
):
try:
from PIL import Image
self._Image = Image
except ImportError:
raise NotConfigured(
"ImagesPipeline requires installing Pillow 4.0.0 or later"
)
super().__init__(store_uri, settings=settings, download_func=download_func)
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
resolve = functools.partial(
self._key_for_pipe,
base_class_name="ImagesPipeline",
settings=settings,
)
self.expires = settings.getint(resolve("IMAGES_EXPIRES"), self.EXPIRES)
if not hasattr(self, "IMAGES_RESULT_FIELD"):
self.IMAGES_RESULT_FIELD = self.DEFAULT_IMAGES_RESULT_FIELD
if not hasattr(self, "IMAGES_URLS_FIELD"):
self.IMAGES_URLS_FIELD = self.DEFAULT_IMAGES_URLS_FIELD
self.images_urls_field = settings.get(
resolve("IMAGES_URLS_FIELD"), self.IMAGES_URLS_FIELD
)
self.images_result_field = settings.get(
resolve("IMAGES_RESULT_FIELD"), self.IMAGES_RESULT_FIELD
)
self.min_width = settings.getint(resolve("IMAGES_MIN_WIDTH"), self.MIN_WIDTH)
self.min_height = settings.getint(resolve("IMAGES_MIN_HEIGHT"), self.MIN_HEIGHT)
self.thumbs = settings.get(resolve("IMAGES_THUMBS"), self.THUMBS)
self._deprecated_convert_image = None
@classmethod
def from_settings(cls, settings) -> Self:
s3store: Type[S3FilesStore] = cast(Type[S3FilesStore], cls.STORE_SCHEMES["s3"])
s3store.AWS_ACCESS_KEY_ID = settings["AWS_ACCESS_KEY_ID"]
s3store.AWS_SECRET_ACCESS_KEY = settings["AWS_SECRET_ACCESS_KEY"]
s3store.AWS_SESSION_TOKEN = settings["AWS_SESSION_TOKEN"]
s3store.AWS_ENDPOINT_URL = settings["AWS_ENDPOINT_URL"]
s3store.AWS_REGION_NAME = settings["AWS_REGION_NAME"]
s3store.AWS_USE_SSL = settings["AWS_USE_SSL"]
s3store.AWS_VERIFY = settings["AWS_VERIFY"]
s3store.POLICY = settings["IMAGES_STORE_S3_ACL"]
gcs_store: Type[GCSFilesStore] = cast(
Type[GCSFilesStore], cls.STORE_SCHEMES["gs"]
)
gcs_store.GCS_PROJECT_ID = settings["GCS_PROJECT_ID"]
gcs_store.POLICY = settings["IMAGES_STORE_GCS_ACL"] or None
ftp_store: Type[FTPFilesStore] = cast(
Type[FTPFilesStore], cls.STORE_SCHEMES["ftp"]
)
ftp_store.FTP_USERNAME = settings["FTP_USER"]
ftp_store.FTP_PASSWORD = settings["FTP_PASSWORD"]
ftp_store.USE_ACTIVE_MODE = settings.getbool("FEED_STORAGE_FTP_ACTIVE")
store_uri = settings["IMAGES_STORE"]
return cls(store_uri, settings=settings)
def file_downloaded(self, response, request, info, *, item=None):
return self.image_downloaded(response, request, info, item=item)
def image_downloaded(self, response, request, info, *, item=None):
checksum = None
for path, image, buf in self.get_images(response, request, info, item=item):
if checksum is None:
buf.seek(0)
checksum = _md5sum(buf)
width, height = image.size
self.store.persist_file(
path,
buf,
info,
meta={"width": width, "height": height},
headers={"Content-Type": "image/jpeg"},
)
return checksum
def get_images(self, response, request, info, *, item=None):
path = self.file_path(request, response=response, info=info, item=item)
orig_image = self._Image.open(BytesIO(response.body))
width, height = orig_image.size
if width < self.min_width or height < self.min_height:
raise ImageException(
"Image too small "
f"({width}x{height} < "
f"{self.min_width}x{self.min_height})"
)
if self._deprecated_convert_image is None:
self._deprecated_convert_image = "response_body" not in get_func_args(
self.convert_image
)
if self._deprecated_convert_image:
warnings.warn(
f"{self.__class__.__name__}.convert_image() method overridden in a deprecated way, "
"overridden method does not accept response_body argument.",
category=ScrapyDeprecationWarning,
)
if self._deprecated_convert_image:
image, buf = self.convert_image(orig_image)
else:
image, buf = self.convert_image(
orig_image, response_body=BytesIO(response.body)
)
yield path, image, buf
for thumb_id, size in self.thumbs.items():
thumb_path = self.thumb_path(
request, thumb_id, response=response, info=info, item=item
)
if self._deprecated_convert_image:
thumb_image, thumb_buf = self.convert_image(image, size)
else:
thumb_image, thumb_buf = self.convert_image(image, size, buf)
yield thumb_path, thumb_image, thumb_buf
def convert_image(self, image, size=None, response_body=None):
if response_body is None:
warnings.warn(
f"{self.__class__.__name__}.convert_image() method called in a deprecated way, "
"method called without response_body argument.",
category=ScrapyDeprecationWarning,
stacklevel=2,
)
if image.format in ("PNG", "WEBP") and image.mode == "RGBA":
background = self._Image.new("RGBA", image.size, (255, 255, 255))
background.paste(image, image)
image = background.convert("RGB")
elif image.mode == "P":
image = image.convert("RGBA")
background = self._Image.new("RGBA", image.size, (255, 255, 255))
background.paste(image, image)
image = background.convert("RGB")
elif image.mode != "RGB":
image = image.convert("RGB")
if size:
image = image.copy()
try:
# Image.Resampling.LANCZOS was added in Pillow 9.1.0
# remove this try except block,
# when updating the minimum requirements for Pillow.
resampling_filter = self._Image.Resampling.LANCZOS
except AttributeError:
resampling_filter = self._Image.ANTIALIAS
image.thumbnail(size, resampling_filter)
elif response_body is not None and image.format == "JPEG":
return image, response_body
buf = BytesIO()
image.save(buf, "JPEG")
return image, buf
[docs] def item_completed(self, results, item, info):
with suppress(KeyError):
ItemAdapter(item)[self.images_result_field] = [x for ok, x in results if ok]
return item
[docs] def file_path(self, request, response=None, info=None, *, item=None):
image_guid = hashlib.sha1(to_bytes(request.url)).hexdigest() # nosec
return f"full/{image_guid}.jpg"
[docs] def thumb_path(self, request, thumb_id, response=None, info=None, *, item=None):
thumb_guid = hashlib.sha1(to_bytes(request.url)).hexdigest() # nosec
return f"thumbs/{thumb_id}/{thumb_guid}.jpg"