forked from RustPython/RustPython
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpy_io.rs
More file actions
97 lines (88 loc) · 2.64 KB
/
py_io.rs
File metadata and controls
97 lines (88 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::{
builtins::{PyBaseExceptionRef, PyBytes, PyStr},
common::ascii,
PyObject, PyObjectRef, PyResult, VirtualMachine,
};
use std::{fmt, io, ops};
pub trait Write {
type Error;
fn write_fmt(&mut self, args: fmt::Arguments) -> Result<(), Self::Error>;
}
#[repr(transparent)]
pub struct IoWriter<T>(pub T);
impl<T> IoWriter<T> {
pub fn from_ref(x: &mut T) -> &mut Self {
// SAFETY: IoWriter is repr(transparent) over T
unsafe { &mut *(x as *mut T as *mut Self) }
}
}
impl<T> ops::Deref for IoWriter<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> ops::DerefMut for IoWriter<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<W> Write for IoWriter<W>
where
W: io::Write,
{
type Error = io::Error;
fn write_fmt(&mut self, args: fmt::Arguments) -> io::Result<()> {
<W as io::Write>::write_fmt(&mut self.0, args)
}
}
impl Write for String {
type Error = fmt::Error;
fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result {
<String as fmt::Write>::write_fmt(self, args)
}
}
pub struct PyWriter<'vm>(pub PyObjectRef, pub &'vm VirtualMachine);
impl Write for PyWriter<'_> {
type Error = PyBaseExceptionRef;
fn write_fmt(&mut self, args: fmt::Arguments) -> Result<(), Self::Error> {
let PyWriter(obj, vm) = self;
vm.call_method(obj, "write", (args.to_string(),)).map(drop)
}
}
pub fn file_readline(obj: &PyObject, size: Option<usize>, vm: &VirtualMachine) -> PyResult {
let args = size.map_or_else(Vec::new, |size| vec![vm.ctx.new_int(size).into()]);
let ret = vm.call_method(obj, "readline", args)?;
let eof_err = || {
vm.new_exception(
vm.ctx.exceptions.eof_error.to_owned(),
vec![vm.ctx.new_str(ascii!("EOF when reading a line")).into()],
)
};
let ret = match_class!(match ret {
s @ PyStr => {
let sval = s.as_str();
if sval.is_empty() {
return Err(eof_err());
}
if let Some(nonl) = sval.strip_suffix('\n') {
vm.ctx.new_str(nonl).into()
} else {
s.into()
}
}
b @ PyBytes => {
let buf = b.as_bytes();
if buf.is_empty() {
return Err(eof_err());
}
if buf.last() == Some(&b'\n') {
vm.ctx.new_bytes(buf[..buf.len() - 1].to_owned()).into()
} else {
b.into()
}
}
_ => return Err(vm.new_type_error("object.readline() returned non-string".to_owned())),
});
Ok(ret)
}