forked from python-openxml/python-docx
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathphys_pkg.py
More file actions
155 lines (129 loc) · 4.27 KB
/
phys_pkg.py
File metadata and controls
155 lines (129 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# encoding: utf-8
"""
Provides a general interface to a *physical* OPC package, such as a zip file.
"""
from __future__ import absolute_import
import os
from zipfile import ZipFile, is_zipfile, ZIP_DEFLATED
from .compat import is_string
from .exceptions import PackageNotFoundError
from .packuri import CONTENT_TYPES_URI
class PhysPkgReader(object):
"""
Factory for physical package reader objects.
"""
def __new__(cls, pkg_file):
# if *pkg_file* is a string, treat it as a path
if is_string(pkg_file):
if os.path.isdir(pkg_file):
reader_cls = _DirPkgReader
elif is_zipfile(pkg_file):
reader_cls = _ZipPkgReader
else:
raise PackageNotFoundError(
"Package not found at '%s'" % pkg_file
)
else: # assume it's a stream and pass it to Zip reader to sort out
reader_cls = _ZipPkgReader
return super(PhysPkgReader, cls).__new__(reader_cls)
class PhysPkgWriter(object):
"""
Factory for physical package writer objects.
"""
def __new__(cls, pkg_file):
return super(PhysPkgWriter, cls).__new__(_ZipPkgWriter)
class _DirPkgReader(PhysPkgReader):
"""
Implements |PhysPkgReader| interface for an OPC package extracted into a
directory.
"""
def __init__(self, path):
"""
*path* is the path to a directory containing an expanded package.
"""
super(_DirPkgReader, self).__init__()
self._path = os.path.abspath(path)
def blob_for(self, pack_uri):
"""
Return contents of file corresponding to *pack_uri* in package
directory.
"""
path = os.path.join(self._path, pack_uri.membername)
with open(path, 'rb') as f:
blob = f.read()
return blob
def close(self):
"""
Provides interface consistency with |ZipFileSystem|, but does
nothing, a directory file system doesn't need closing.
"""
pass
@property
def content_types_xml(self):
"""
Return the `[Content_Types].xml` blob from the package.
"""
return self.blob_for(CONTENT_TYPES_URI)
def rels_xml_for(self, source_uri):
"""
Return rels item XML for source with *source_uri*, or None if the
item has no rels item.
"""
try:
rels_xml = self.blob_for(source_uri.rels_uri)
except IOError:
rels_xml = None
return rels_xml
class _ZipPkgReader(PhysPkgReader):
"""
Implements |PhysPkgReader| interface for a zip file OPC package.
"""
def __init__(self, pkg_file):
super(_ZipPkgReader, self).__init__()
self._zipf = ZipFile(pkg_file, 'r')
def blob_for(self, pack_uri):
"""
Return blob corresponding to *pack_uri*. Raises |ValueError| if no
matching member is present in zip archive.
"""
return self._zipf.read(pack_uri.membername)
def close(self):
"""
Close the zip archive, releasing any resources it is using.
"""
self._zipf.close()
@property
def content_types_xml(self):
"""
Return the `[Content_Types].xml` blob from the zip package.
"""
return self.blob_for(CONTENT_TYPES_URI)
def rels_xml_for(self, source_uri):
"""
Return rels item XML for source with *source_uri* or None if no rels
item is present.
"""
try:
rels_xml = self.blob_for(source_uri.rels_uri)
except KeyError:
rels_xml = None
return rels_xml
class _ZipPkgWriter(PhysPkgWriter):
"""
Implements |PhysPkgWriter| interface for a zip file OPC package.
"""
def __init__(self, pkg_file):
super(_ZipPkgWriter, self).__init__()
self._zipf = ZipFile(pkg_file, 'w', compression=ZIP_DEFLATED)
def close(self):
"""
Close the zip archive, flushing any pending physical writes and
releasing any resources it's using.
"""
self._zipf.close()
def write(self, pack_uri, blob):
"""
Write *blob* to this zip package with the membername corresponding to
*pack_uri*.
"""
self._zipf.writestr(pack_uri.membername, blob)