forked from bityon/python-hadoop
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathInputStream.py
More file actions
194 lines (142 loc) · 4.92 KB
/
Copy pathInputStream.py
File metadata and controls
194 lines (142 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python
# ========================================================================
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
import os
class InputStream(object):
def available(self):
raise NotImplementedError
def close(self):
raise NotImplementedError
def mark(self, read_limit):
raise NotImplementedError
def markSupported(self):
raise NotImplementedError
def readByte(self):
return self.read(1)
def readFully(self, length):
return self.read(length)
def read(self, length):
raise NotImplementedError
def reset(self):
raise NotImplementedError
def skip(self, n):
raise NotImplementedError
class ByteArrayInputStream(InputStream):
def __init__(self, data='', offset=0, length=0):
self.reset(data, offset, length)
def size(self):
return self._count - self._offset
def toByteArray(self):
return self._buffer[self._offset:self._offset + self._count]
def reset(self, data, offset=0, length=0):
if data and not length:
length = len(data) - offset
self._buffer = data
self._offset = offset
self._count = length
def close(self):
pass
def flush(self):
pass
def read(self, length):
data = self._buffer[self._offset:self._offset + length]
self._offset += length
return data
class FileInputStream(InputStream):
def __init__(self, path):
self._fd = open(path, 'rb')
self._length = os.path.getsize(path)
def length(self):
return self._length
def close(self):
self._fd.close()
def seek(self, offset):
self._fd.seek(offset)
def getPos(self):
return self._fd.tell()
def readByte(self):
return self._fd.read(1)
def read(self, length):
byte_buffer = []
while length > 0:
data = self._fd.read(length)
if not data:
break
data_length = len(data)
byte_buffer.append(data)
length -= data_length
return b''.join(byte_buffer)
def skip(self, n):
skip_length = 0
while n > 0:
data = self._fd.read(n)
if not data:
break
data_length = len(data)
skip_length += data_length
n -= data_length
return skip_length
class DataInputStream(InputStream):
def __init__(self, input_stream):
assert isinstance(input_stream, InputStream)
self._stream = input_stream
def close(self):
return self._stream.close()
def seek(self, offset):
return self._stream.seek(offset)
def getPos(self):
return self._stream.getPos()
def length(self):
return self._stream.length()
def read(self, length):
return self._stream.read(length)
def readByte(self):
data = self._stream.read(1)
return struct.unpack(">b", data)[0]
def readFully(self, length):
return [self.readByte() for _ in range(length)]
def readUByte(self):
data = self._stream.read(1)
return struct.unpack("B", data)[0]
def readBoolean(self):
data = self._stream.read(1)
return struct.unpack(">?", data)[0]
def readInt(self):
data = self._stream.read(4)
return struct.unpack(">i", data)[0]
def readLong(self):
data = self._stream.read(8)
return struct.unpack(">q", data)[0]
def readFloat(self):
data = self._stream.read(4)
return struct.unpack(">f", data)[0]
def readDouble(self):
data = self._stream.read(8)
return struct.unpack(">d", data)[0]
def skipBytes(self, n):
return self._stream.skip(n)
class DataInputBuffer(DataInputStream):
def __init__(self, data='', offset=0, length=0):
input_stream = ByteArrayInputStream(data, offset, length)
super(DataInputBuffer, self).__init__(input_stream)
def reset(self, data, offset=0, length=0):
self._stream.reset(data, offset, length)
def size(self):
return self._stream.size()
def toByteArray(self):
return self._stream.toByteArray()