forked from python-openxml/python-opc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpackage.py
More file actions
333 lines (292 loc) · 10.7 KB
/
package.py
File metadata and controls
333 lines (292 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# -*- coding: utf-8 -*-
#
# package.py
#
# Copyright (C) 2012, 2013 Steve Canny scanny@cisco.com
#
# This module is part of python-opc and is released under the MIT License:
# http://www.opensource.org/licenses/mit-license.php
"""
Provides an API for manipulating Open Packaging Convention (OPC) packages.
"""
from opc.constants import RELATIONSHIP_TYPE as RT
from opc.oxml import CT_Relationships
from opc.packuri import PACKAGE_URI
from opc.pkgreader import PackageReader
from opc.pkgwriter import PackageWriter
class OpcPackage(object):
"""
Main API class for |python-opc|. A new instance is constructed by calling
the :meth:`open` class method with a path to a package file or file-like
object containing one.
"""
def __init__(self):
super(OpcPackage, self).__init__()
self._rels = RelationshipCollection(PACKAGE_URI.baseURI)
@property
def main_document(self):
"""
Return a reference to the main document part for this package.
Examples include a document part for a WordprocessingML package, a
presentation part for a PresentationML package, or a workbook part
for a SpreadsheetML package.
"""
rel = self._rels.get_rel_of_type(RT.OFFICE_DOCUMENT)
return rel.target_part
@staticmethod
def open(pkg_file):
"""
Return an |OpcPackage| instance loaded with the contents of
*pkg_file*.
"""
pkg = OpcPackage()
pkg_reader = PackageReader.from_file(pkg_file)
Unmarshaller.unmarshal(pkg_reader, pkg, PartFactory)
return pkg
@property
def parts(self):
"""
Return an immutable sequence (tuple) containing a reference to each
of the parts in this package.
"""
return tuple([p for p in self._walk_parts(self._rels)])
@property
def rels(self):
"""
Return a reference to the |RelationshipCollection| holding the
relationships for this package.
"""
return self._rels
def save(self, pkg_file):
"""
Save this package to *pkg_file*, where *file* can be either a path to
a file (a string) or a file-like object.
"""
for part in self.parts:
part._before_marshal()
PackageWriter.write(pkg_file, self._rels, self.parts)
def _add_relationship(self, reltype, target, rId, external=False):
"""
Return newly added |_Relationship| instance of *reltype* between this
package and part *target* with key *rId*. Target mode is set to
``RTM.EXTERNAL`` if *external* is |True|.
"""
return self._rels.add_relationship(reltype, target, rId, external)
@staticmethod
def _walk_parts(rels, visited_parts=None):
"""
Generate exactly one reference to each of the parts in the package by
performing a depth-first traversal of the rels graph.
"""
if visited_parts is None:
visited_parts = []
for rel in rels:
if rel.is_external:
continue
part = rel.target_part
if part in visited_parts:
continue
visited_parts.append(part)
yield part
for part in OpcPackage._walk_parts(part._rels, visited_parts):
yield part
class Part(object):
"""
Base class for package parts. Provides common properties and methods, but
intended to be subclassed in client code to implement specific part
behaviors.
"""
def __init__(self, partname, content_type, blob=None):
super(Part, self).__init__()
self._partname = partname
self._content_type = content_type
self._blob = blob
self._rels = RelationshipCollection(partname.baseURI)
@property
def blob(self):
"""
Contents of this package part as a sequence of bytes. May be text or
binary.
"""
return self._blob
@property
def content_type(self):
"""
Content type of this part.
"""
return self._content_type
@property
def partname(self):
"""
|PackURI| instance containing partname for this part.
"""
return self._partname
@property
def rels(self):
"""
|RelationshipCollection| instance containing rels for this part.
"""
return self._rels
def _add_relationship(self, reltype, target, rId, external=False):
"""
Return newly added |_Relationship| instance of *reltype* between this
part and *target* with key *rId*. Target mode is set to
``RTM.EXTERNAL`` if *external* is |True|.
"""
return self._rels.add_relationship(reltype, target, rId, external)
def _after_unmarshal(self):
"""
Entry point for post-unmarshaling processing, for example to parse
the part XML. May be overridden by subclasses without forwarding call
to super.
"""
# don't place any code here, just catch call if not overridden by
# subclass
pass
def _before_marshal(self):
"""
Entry point for pre-serialization processing, for example to finalize
part naming if necessary. May be overridden by subclasses without
forwarding call to super.
"""
# don't place any code here, just catch call if not overridden by
# subclass
pass
class PartFactory(object):
"""
Provides a way for client code to specify a subclass of |Part| to be
constructed by |Unmarshaller| based on its content type.
"""
part_type_for = {}
def __new__(cls, partname, content_type, blob):
if content_type in PartFactory.part_type_for:
CustomPartClass = PartFactory.part_type_for[content_type]
return CustomPartClass.load(partname, content_type, blob)
return Part(partname, content_type, blob)
class _Relationship(object):
"""
Value object for relationship to part.
"""
def __init__(self, rId, reltype, target, baseURI, external=False):
super(_Relationship, self).__init__()
self._rId = rId
self._reltype = reltype
self._target = target
self._baseURI = baseURI
self._is_external = bool(external)
@property
def is_external(self):
return self._is_external
@property
def reltype(self):
return self._reltype
@property
def rId(self):
return self._rId
@property
def target_part(self):
if self._is_external:
raise ValueError("target_part property on _Relationship is undef"
"ined when target mode is External")
return self._target
@property
def target_ref(self):
if self._is_external:
return self._target
else:
return self._target.partname.relative_ref(self._baseURI)
class RelationshipCollection(object):
"""
Collection object for |_Relationship| instances, having list semantics.
"""
def __init__(self, baseURI):
super(RelationshipCollection, self).__init__()
self._baseURI = baseURI
self._rels = []
def __getitem__(self, key):
"""
Implements access by subscript, e.g. ``rels[9]``. It also implements
dict-style lookup of a relationship by rId, e.g. ``rels['rId1']``.
"""
if isinstance(key, basestring):
for rel in self._rels:
if rel.rId == key:
return rel
raise KeyError("no rId '%s' in RelationshipCollection" % key)
else:
return self._rels.__getitem__(key)
def __len__(self):
"""Implements len() built-in on this object"""
return self._rels.__len__()
def add_relationship(self, reltype, target, rId, external=False):
"""
Return a newly added |_Relationship| instance.
"""
rel = _Relationship(rId, reltype, target, self._baseURI, external)
self._rels.append(rel)
return rel
def get_rel_of_type(self, reltype):
"""
Return single relationship of type *reltype* from the collection.
Raises |KeyError| if no matching relationship is found. Raises
|ValueError| if more than one matching relationship is found.
"""
matching = [rel for rel in self._rels if rel.reltype == reltype]
if len(matching) == 0:
tmpl = "no relationship of type '%s' in collection"
raise KeyError(tmpl % reltype)
if len(matching) > 1:
tmpl = "multiple relationships of type '%s' in collection"
raise ValueError(tmpl % reltype)
return matching[0]
@property
def xml(self):
"""
Serialize this relationship collection into XML suitable for storage
as a .rels file in an OPC package.
"""
rels_elm = CT_Relationships.new()
for rel in self._rels:
rels_elm.add_rel(rel.rId, rel.reltype, rel.target_ref,
rel.is_external)
return rels_elm.xml
class Unmarshaller(object):
"""
Hosts static methods for unmarshalling a package from a |PackageReader|
instance.
"""
@staticmethod
def unmarshal(pkg_reader, pkg, part_factory):
"""
Construct graph of parts and realized relationships based on the
contents of *pkg_reader*, delegating construction of each part to
*part_factory*. Package relationships are added to *pkg*.
"""
parts = Unmarshaller._unmarshal_parts(pkg_reader, part_factory)
Unmarshaller._unmarshal_relationships(pkg_reader, pkg, parts)
for part in parts.values():
part._after_unmarshal()
@staticmethod
def _unmarshal_parts(pkg_reader, part_factory):
"""
Return a dictionary of |Part| instances unmarshalled from
*pkg_reader*, keyed by partname. Side-effect is that each part in
*pkg_reader* is constructed using *part_factory*.
"""
parts = {}
for partname, content_type, blob in pkg_reader.iter_sparts():
parts[partname] = part_factory(partname, content_type, blob)
return parts
@staticmethod
def _unmarshal_relationships(pkg_reader, pkg, parts):
"""
Add a relationship to the source object corresponding to each of the
relationships in *pkg_reader* with its target_part set to the actual
target part in *parts*.
"""
for source_uri, srel in pkg_reader.iter_srels():
source = pkg if source_uri == '/' else parts[source_uri]
target = (srel.target_ref if srel.is_external
else parts[srel.target_partname])
source._add_relationship(srel.reltype, target, srel.rId,
srel.is_external)