diff --git a/.gitignore b/.gitignore index 4b4e909..4071ed6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ **/*.seq +**/__pycache__/ +**.egg-info/ +build/ +dist/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README b/README deleted file mode 100644 index fe2842b..0000000 --- a/README +++ /dev/null @@ -1,14 +0,0 @@ -Pure Python SequenceFile Reader and Writer implementation -that allows you to read and write your Hadoop sequence files -without using java. - -Author: Matteo Bertozzi - -Contributors: - - * Brian Bloniarz - * Alex Roper - * Jeremy G. Kahn - --- -Added the support for BytesWritable method (found in forks repos) diff --git a/README.md b/README.md new file mode 100644 index 0000000..d7d134c --- /dev/null +++ b/README.md @@ -0,0 +1,38 @@ +Python Hadoop I/O Utilities +=========================== + +Pure Python SequenceFile Reader and Writer implementation +that allows you to read and write Hadoop sequence files +without using Java. + +## Installation + +``` +python setup.py install +``` + +or in your project requirements.txt: +``` +-e git+https://github.com/commoncrawl/python-hadoop.git@main#egg=hadoop +``` + +## Usage + +See [examples](./examples/) how to read and write SequenceFiles and other +file formats specific to Hadoop resp. MapReduce. + +## Credits + +Author: Matteo Bertozzi +(see the [original repository](//github.com/matteobertozzi/Hadoop)) + +Contributions to this fork: +- via [bityon/python-hadoop](//github.com/bityon/python-hadoop) + * Brian Bloniarz + * Alex Roper + * Jeremy G. Kahn +- Python 3 migration + * Jing Conan Wang (@jingcwang: [jingcwang/Hadoop](//github.com/jingcwang/Hadoop) + * Jie Tang (@jtang7: [jtang7/Hadoop](//github.com/jtang7/Hadoop)) + +See the commit logs for a complete list of contributors. \ No newline at end of file diff --git a/examples/ArrayFileTest.py b/examples/ArrayFileTest.py index 64db0c0..7747d31 100644 --- a/examples/ArrayFileTest.py +++ b/examples/ArrayFileTest.py @@ -22,7 +22,7 @@ if __name__ == '__main__': writer = ArrayFile.Writer('array-test', IntWritable) writer.INDEX_INTERVAL = 16 - for i in xrange(0, 100): + for i in range(0, 100): writer.append(IntWritable(1 + i * 10)) writer.close() @@ -30,25 +30,25 @@ value = IntWritable() reader = ArrayFile.Reader('array-test') while reader.next(key, value): - print key, value - - print 'GET 8' - print reader.get(8, value) - print value - print - - print 'GET 110' - print reader.get(110, value) - print - - print 'GET 25' - print reader.get(25, value) - print value - print - - print 'GET 55' - print reader.get(55, value) - print value - print + print(key, value) + + print('GET 8') + print(reader.get(8, value)) + print(value) + print() + + print('GET 110') + print(reader.get(110, value)) + print() + + print('GET 25') + print(reader.get(25, value)) + print(value) + print() + + print('GET 55') + print(reader.get(55, value)) + print(value) + print() reader.close() diff --git a/examples/MapFileTest.py b/examples/MapFileTest.py index 6fc45e8..ca15464 100644 --- a/examples/MapFileTest.py +++ b/examples/MapFileTest.py @@ -22,7 +22,7 @@ if __name__ == '__main__': writer = MapFile.Writer('map-test', LongWritable, LongWritable) writer.INDEX_INTERVAL = 2 - for i in xrange(0, 100, 2): + for i in range(0, 100, 2): writer.append(LongWritable(i), LongWritable(i * 10)) writer.close() @@ -30,32 +30,32 @@ value = LongWritable() reader = MapFile.Reader('map-test') while reader.next(key, value): - print key, value + print(key, value) - print 'MID KEY', reader.midKey() - print 'FINAL KEY', reader.finalKey(key), key + print('MID KEY', reader.midKey()) + print('FINAL KEY', reader.finalKey(key), key) - print 'GET CLOSEST' + print('GET CLOSEST') key.set(8) - print reader.get(key, value) - print value - print + print(reader.get(key, value)) + print(value) + print() - print 'GET 111' + print('GET 111') key.set(111) - print reader.get(key, value) - print + print(reader.get(key, value)) + print() key.set(25) - print 'SEEK 25 before' - print reader.getClosest(key, value, before=True) - print value - print + print('SEEK 25 before') + print(reader.getClosest(key, value, before=True)) + print(value) + print() key.set(55) - print 'SEEK 55' - print reader.getClosest(key, value) - print value - print + print('SEEK 55') + print(reader.getClosest(key, value)) + print(value) + print() reader.close() diff --git a/examples/SequenceFileMeta.py b/examples/SequenceFileMeta.py index 029c971..43adda9 100644 --- a/examples/SequenceFileMeta.py +++ b/examples/SequenceFileMeta.py @@ -25,10 +25,10 @@ def writeData(writer): key = LongWritable() value = LongWritable() - for i in xrange(10): + for i in range(10): key.set(1000 - i) value.set(i) - print '[%d] %s %s' % (writer.getLength(), key.toString(), value.toString()) + print('[%d] %s %s' % (writer.getLength(), key.toString(), value.toString())) writer.append(key, value) def testWrite(filename): @@ -45,7 +45,7 @@ def testRead(filename): metadata = reader.getMetadata() for meta_key, meta_value in metadata: - print 'METADATA:', meta_key, meta_value + print('METADATA:', meta_key, meta_value) key_class = reader.getKeyClass() value_class = reader.getValueClass() @@ -55,8 +55,8 @@ def testRead(filename): position = reader.getPosition() while reader.next(key, value): - print '*' if reader.syncSeen() else ' ', - print '[%6s] %6s %6s' % (position, key.toString(), value.toString()) + print('*' if reader.syncSeen() else ' ', end=' ') + print('[%6s] %6s %6s' % (position, key.toString(), value.toString())) position = reader.getPosition() reader.close() diff --git a/examples/SequenceFileReader.py b/examples/SequenceFileReader.py index 7af7995..64ac9e5 100644 --- a/examples/SequenceFileReader.py +++ b/examples/SequenceFileReader.py @@ -22,7 +22,7 @@ if __name__ == '__main__': if len(sys.argv) < 2: - print 'usage: SequenceFileReader ' + print('usage: SequenceFileReader ') else: reader = SequenceFile.Reader(sys.argv[1]) @@ -35,8 +35,8 @@ #reader.sync(4042) position = reader.getPosition() while reader.next(key, value): - print '*' if reader.syncSeen() else ' ', - print '[%6s] %6s %6s' % (position, key.toString(), value.toString()) + print('*' if reader.syncSeen() else ' ', end=' ') + print('[%6s] %6s %6s' % (position, key.toString(), value.toString())) position = reader.getPosition() reader.close() diff --git a/examples/SequenceFileReaderCount.py b/examples/SequenceFileReaderCount.py index dd9ab7a..2225aab 100644 --- a/examples/SequenceFileReaderCount.py +++ b/examples/SequenceFileReaderCount.py @@ -22,7 +22,7 @@ if __name__ == '__main__': if len(sys.argv) < 2: - print 'usage: SequenceFileReader ' + print('usage: SequenceFileReaderCount ') else: reader = SequenceFile.Reader(sys.argv[1]) @@ -33,14 +33,12 @@ value = value_class() counter = 0 - + position = reader.getPosition() while reader.next(key, value): - #print '*' if reader.syncSeen() else ' ', - #print '[%6s] %6s %6s' % (position, key.toString(), value.toString()) counter += 1 position = reader.getPosition() reader.close() - print counter + print(counter) diff --git a/examples/SequenceFileWriterDemo.py b/examples/SequenceFileWriterDemo.py index 0ebe99a..2460f00 100644 --- a/examples/SequenceFileWriterDemo.py +++ b/examples/SequenceFileWriterDemo.py @@ -24,10 +24,10 @@ def writeData(writer): key = LongWritable() value = LongWritable() - for i in xrange(1000): + for i in range(1000): key.set(1000 - i) value.set(i) - print '[%d] %s %s' % (writer.getLength(), key.toString(), value.toString()) + print('[%d] %s %s' % (writer.getLength(), key.toString(), value.toString())) writer.append(key, value) if __name__ == '__main__': diff --git a/examples/SequenceFileWriterDemoBytes.py b/examples/SequenceFileWriterDemoBytes.py index 06295c4..684618a 100644 --- a/examples/SequenceFileWriterDemoBytes.py +++ b/examples/SequenceFileWriterDemoBytes.py @@ -25,11 +25,12 @@ def writeData(writer): key = BytesWritable() value = BytesWritable() - # for i in xrange(1000): - key.set("A") - value.set("B") - print '[%d] %s %s' % (writer.getLength(), key.toString(), value.toString()) - writer.append(key, value) + iterations = 3 + for _i in range(iterations): + key.set(b'A') + value.set(b'A') + print('[%d] %s %s' % (writer.getLength(), key.toString(), value.toString())) + writer.append(key, value) if __name__ == '__main__': writer = SequenceFile.createWriter('test-bytes.seq', BytesWritable, BytesWritable) diff --git a/examples/SetFileTest.py b/examples/SetFileTest.py index 21431f2..e42837a 100644 --- a/examples/SetFileTest.py +++ b/examples/SetFileTest.py @@ -22,33 +22,33 @@ if __name__ == '__main__': writer = SetFile.Writer('set-test', IntWritable) writer.INDEX_INTERVAL = 16 - for i in xrange(0, 100, 2): + for i in range(0, 100, 2): writer.append(IntWritable(i * 10)) writer.close() key = IntWritable() reader = SetFile.Reader('set-test') while reader.next(key): - print key + print(key) - print 'GET 8' + print('GET 8') key.set(8) - print reader.get(key) - print + print(reader.get(key)) + print() - print 'GET 120' + print('GET 120') key.set(120) - print reader.get(key) - print + print(reader.get(key)) + print() - print 'GET 240' + print('GET 240') key.set(240) - print reader.get(key) - print + print(reader.get(key)) + print() - print 'GET 550' + print('GET 550') key.set(550) - print reader.get(key) - print + print(reader.get(key)) + print() reader.close() diff --git a/hadoop/__init__.py b/hadoop/__init__.py index 8017cd6..2458452 100644 --- a/hadoop/__init__.py +++ b/hadoop/__init__.py @@ -16,6 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import io -import util +"""Python Hadoop I/O Utilities + +Pure Python SequenceFile Reader and Writer implementation +that allows you to read and write Hadoop sequence files +without using Java. +""" + +from . import io +from . import util diff --git a/hadoop/io/ArrayFile.py b/hadoop/io/ArrayFile.py index b604a24..7f33381 100644 --- a/hadoop/io/ArrayFile.py +++ b/hadoop/io/ArrayFile.py @@ -16,8 +16,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from IntWritable import LongWritable -import MapFile +from .IntWritable import LongWritable +from . import MapFile + class Writer(MapFile.Writer): def __init__(self, path, value_class): @@ -28,6 +29,7 @@ def append(self, value): super(Writer, self).append(LongWritable(self._count), value) self._count += 1 + class Reader(MapFile.Reader): def __init__(self, path): super(Reader, self).__init__(path) @@ -45,5 +47,5 @@ def key(self): def get(self, n, value): self._key.set(n) - return(super(Reader, self).get(self._key, value)) + return super(Reader, self).get(self._key, value) diff --git a/hadoop/io/BytesWritable.py b/hadoop/io/BytesWritable.py index 81639d8..35ca51f 100644 --- a/hadoop/io/BytesWritable.py +++ b/hadoop/io/BytesWritable.py @@ -16,45 +16,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from Writable import WritableComparable -from WritableUtils import readVInt, writeVInt +from .Writable import AbstractValueWritable +from .WritableUtils import intToByte -class BytesWritable(WritableComparable): - def __init__(self): - self._bytes = '' - self._length = 0 - - def getBytes(self): - return self._bytes - - def getLength(self): - return self._length - - def set(self, value): - self._bytes = value - self._length = len(self._bytes) - - def append(self, value): - new_bytes = value - self._bytes += new_bytes - self._length += len(new_bytes) - - def clear(self): - self._length = 0 - self._bytes = '' +class BytesWritable(AbstractValueWritable): def write(self, data_output): - writeVInt(data_output, self._length) - data_output.write(self._bytes) + data_output.writeInt(len(self._value)) + data_output.write(self._value) def readFields(self, data_input): - self._length = readVInt(data_input) - self._bytes = data_input.read(self._length) - - def equal(self, other): - if not isinstance(other, BytesWritable): - return False - return self._bytes == other._bytes and self._length and other._length + size = data_input.readInt() + self._value = data_input.readFully(size) def toString(self): - return self._bytes + return b''.join(intToByte(x) for x in self._value) diff --git a/hadoop/io/FloatWritable.py b/hadoop/io/FloatWritable.py index 35d2ad8..e45389c 100644 --- a/hadoop/io/FloatWritable.py +++ b/hadoop/io/FloatWritable.py @@ -16,7 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from Writable import AbstractValueWritable +from .Writable import AbstractValueWritable + class FloatWritable(AbstractValueWritable): def write(self, data_output): @@ -25,6 +26,7 @@ def write(self, data_output): def readFields(self, data_input): self._value = data_input.readFloat() + class DoubleWritable(AbstractValueWritable): def write(self, data_output): data_output.writeDouble(self._value) diff --git a/hadoop/io/InputStream.py b/hadoop/io/InputStream.py index ea5acd7..a29bb91 100644 --- a/hadoop/io/InputStream.py +++ b/hadoop/io/InputStream.py @@ -19,6 +19,7 @@ import struct import os + class InputStream(object): def available(self): raise NotImplementedError @@ -35,6 +36,9 @@ def markSupported(self): def readByte(self): return self.read(1) + def readFully(self, length): + return self.read(length) + def read(self, length): raise NotImplementedError @@ -44,6 +48,7 @@ def reset(self): def skip(self, n): raise NotImplementedError + class ByteArrayInputStream(InputStream): def __init__(self, data='', offset=0, length=0): self.reset(data, offset, length) @@ -52,7 +57,7 @@ def size(self): return self._count - self._offset def toByteArray(self): - return self._buffer[self._offset:self._offset+self._count] + return self._buffer[self._offset:self._offset + self._count] def reset(self, data, offset=0, length=0): if data and not length: @@ -68,10 +73,11 @@ def flush(self): pass def read(self, length): - data = self._buffer[self._offset:self._offset+length] + data = self._buffer[self._offset:self._offset + length] self._offset += length return data + class FileInputStream(InputStream): def __init__(self, path): self._fd = open(path, 'rb') @@ -102,7 +108,7 @@ def read(self, length): data_length = len(data) byte_buffer.append(data) length -= data_length - return ''.join(byte_buffer) + return b''.join(byte_buffer) def skip(self, n): skip_length = 0 @@ -116,6 +122,7 @@ def skip(self, n): n -= data_length return skip_length + class DataInputStream(InputStream): def __init__(self, input_stream): assert isinstance(input_stream, InputStream) @@ -140,6 +147,9 @@ def readByte(self): data = self._stream.read(1) return struct.unpack(">b", data)[0] + def readFully(self, length): + return [self.readByte() for _ in range(length)] + def readUByte(self): data = self._stream.read(1) return struct.unpack("B", data)[0] @@ -167,6 +177,7 @@ def readDouble(self): def skipBytes(self, n): return self._stream.skip(n) + class DataInputBuffer(DataInputStream): def __init__(self, data='', offset=0, length=0): input_stream = ByteArrayInputStream(data, offset, length) diff --git a/hadoop/io/IntWritable.py b/hadoop/io/IntWritable.py index eb3169a..98dc4ea 100644 --- a/hadoop/io/IntWritable.py +++ b/hadoop/io/IntWritable.py @@ -16,8 +16,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from Writable import AbstractValueWritable -from WritableUtils import readVInt, readVLong, writeVInt, writeVLong +from .Writable import AbstractValueWritable +from .WritableUtils import readVInt, readVLong, writeVInt, writeVLong + class IntWritable(AbstractValueWritable): def write(self, data_output): @@ -26,6 +27,7 @@ def write(self, data_output): def readFields(self, data_input): self._value = data_input.readInt() + class LongWritable(AbstractValueWritable): def write(self, data_output): data_output.writeLong(self._value) @@ -33,6 +35,7 @@ def write(self, data_output): def readFields(self, data_input): self._value = data_input.readLong() + class VIntWritable(AbstractValueWritable): def write(self, data_output): writeVInt(data_output, self._value) @@ -40,6 +43,7 @@ def write(self, data_output): def readFields(self, data_input): self._value = readVInt(data_input) + class VLongWritable(AbstractValueWritable): def write(self, data_output): writeVLong(data_output, self._value) diff --git a/hadoop/io/Lz4Codec.py b/hadoop/io/Lz4Codec.py new file mode 100755 index 0000000..a76dfbd --- /dev/null +++ b/hadoop/io/Lz4Codec.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# ======================================================================== +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import lz4_raw +import struct +from io import StringIO + +from hadoop.io.InputStream import DataInputBuffer + +DEFAULT_BUF_SIZE = 256 * 1024 + + +class Lz4Codec: + def __init__(self): + self.uncompressedBytes = 0 + self.blockSize = 0 + + def compress(self, data): + raise NotImplementedError + + def decompress(self, data): + io = StringIO(data) + if self.uncompressedBytes >= self.blockSize: + self.blockSize = struct.unpack('>I', io.read(4))[0] + self.uncompressedBytes = 0 + if self.blockSize == 0: + return "" + + trunkSize = struct.unpack('>I', io.read(4))[0] + f = lz4_raw.decompress(io.read(trunkSize), DEFAULT_BUF_SIZE) + self.uncompressedBytes += len(f) + return f + + def decompressInputStream(self, data): + return DataInputBuffer(self.decompress(data)) diff --git a/hadoop/io/MapFile.py b/hadoop/io/MapFile.py index a9e87cc..bc690a2 100644 --- a/hadoop/io/MapFile.py +++ b/hadoop/io/MapFile.py @@ -18,12 +18,13 @@ import os -from IntWritable import LongWritable -import SequenceFile +from .IntWritable import LongWritable +from . import SequenceFile INDEX_FILE_NAME = 'index' DATA_FILE_NAME = 'data' + class Writer(object): INDEX_INTERVAL = 128 @@ -59,6 +60,7 @@ def append(self, key, value): def _checkKey(self, key): pass + class Reader(object): INDEX_SKIP = 0 diff --git a/hadoop/io/NullWritable.py b/hadoop/io/NullWritable.py index c8215f2..4f9dd01 100644 --- a/hadoop/io/NullWritable.py +++ b/hadoop/io/NullWritable.py @@ -16,7 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from Writable import WritableComparable +from .Writable import WritableComparable + class NullWritable(WritableComparable): def __new__(cls, *p, **k): diff --git a/hadoop/io/OutputStream.py b/hadoop/io/OutputStream.py index 7aa98bb..d69430f 100644 --- a/hadoop/io/OutputStream.py +++ b/hadoop/io/OutputStream.py @@ -18,6 +18,7 @@ import struct + class OutputStream(object): def close(self): raise NotImplementedError @@ -31,6 +32,7 @@ def writeByte(self, byte): def write(self, data): raise NotImplementedError + class FileOutputStream(OutputStream): def __init__(self, path): self._fd = open(path, 'wb') @@ -51,8 +53,11 @@ def writeByte(self, value): return self._fd.write(value) def write(self, value): + if type(value) is str: + value = value.encode('utf-8') return self._fd.write(value) + class DataOutputStream(object): def __init__(self, output_stream): assert isinstance(output_stream, OutputStream) @@ -108,6 +113,7 @@ def writeDouble(self, value): def skipBytes(self, n): return self._stream.skip(n) + class ByteArrayOutputStream(OutputStream): def __init__(self): self._buffer = [] @@ -117,7 +123,7 @@ def size(self): return self._count def toByteArray(self): - return ''.join(self._buffer) + return b''.join(self._buffer) def reset(self): self._buffer = [] @@ -129,9 +135,10 @@ def close(self): def flush(self): pass - def write(self, bytes): - self._buffer.append(bytes) - self._count += len(bytes) + def write(self, data): + self._buffer.append(data) + self._count += len(data) + class DataOutputBuffer(DataOutputStream): def __init__(self): diff --git a/hadoop/io/SequenceFile.py b/hadoop/io/SequenceFile.py index 767e9b0..1646f5b 100644 --- a/hadoop/io/SequenceFile.py +++ b/hadoop/io/SequenceFile.py @@ -23,21 +23,21 @@ from hadoop.util.ReflectionUtils import hadoopClassFromName, hadoopClassName -from compress import CodecPool +from .compress import CodecPool -from WritableUtils import readVInt, writeVInt -from Writable import Writable -from OutputStream import FileOutputStream, DataOutputStream, DataOutputBuffer -from InputStream import FileInputStream, DataInputStream, DataInputBuffer -from VersionMismatchException import VersionMismatchException, VersionPrefixException +from .WritableUtils import readVInt, writeVInt, intToByte +from .Writable import Writable +from .OutputStream import FileOutputStream, DataOutputStream, DataOutputBuffer +from .InputStream import FileInputStream, DataInputStream, DataInputBuffer +from .VersionMismatchException import VersionMismatchException, VersionPrefixException -from Text import Text +from .Text import Text -BLOCK_COMPRESS_VERSION = '\x04' -CUSTOM_COMPRESS_VERSION = '\x05' -VERSION_WITH_METADATA = '\x06' -VERSION_PREFIX = 'SEQ' -VERSION = VERSION_PREFIX + VERSION_WITH_METADATA +BLOCK_COMPRESS_VERSION = 0x04 +CUSTOM_COMPRESS_VERSION = 0x05 +VERSION_WITH_METADATA = 0x06 +VERSION_PREFIX = b'SEQ' +VERSION = VERSION_PREFIX + intToByte(VERSION_WITH_METADATA) SYNC_ESCAPE = -1 SYNC_HASH_SIZE = 16 @@ -45,10 +45,12 @@ SYNC_INTERVAL = 100 * SYNC_SIZE + class CompressionType: NONE = 0 RECORD = 1 - BLOCK = 2 + BLOCK = 2 + class Metadata(Writable): def __init__(self, metadata=None): @@ -75,15 +77,18 @@ def values(self): def itervalues(self): return self._meta.itervalues() + def items(self): + return self._meta.items() + def iteritems(self): - return self._meta.iteritems() + return iter(self._meta.items()) def __iter__(self): - return self._meta.iteritems() + return self.iteritems() def write(self, data_output): data_output.writeInt(len(self._meta)) - for key, value in self._meta.iteritems(): + for key, value in self._meta.items(): Text.writeString(data_output, key) Text.writeString(data_output, value) @@ -92,12 +97,13 @@ def readFields(self, data_input): if count < 0: raise IOError("Invalid size: %d for file metadata object" % count) - for i in xrange(count): + for _i in range(count): key = Text.readString(data_input) value = Text.readString(data_input) self._meta[key] = value -def createWriter(path, key_class, value_class, metadata=None, compression_type=CompressionType.NONE): +def createWriter(path, key_class, value_class, metadata=None, + compression_type=CompressionType.NONE): kwargs = {} if compression_type == CompressionType.NONE: @@ -118,6 +124,7 @@ def createRecordWriter(path, key_class, value_class, metadata=None): def createBlockWriter(path, key_class, value_class, metadata=None): return Writer(path, key_class, value_class, metadata, compress=True, block_compress=True) + class Writer(object): COMPRESSION_BLOCK_SIZE = 1000000 @@ -145,7 +152,8 @@ def __init__(self, path, key_class, value_class, metadata, compress=False, block self._stream = DataOutputStream(FileOutputStream(path)) # sync is 16 random bytes - self._sync = md5('%s@%d' % (uuid1().bytes, int(time() * 1000))).digest() + self._sync = md5(b'%s@%d' + % (uuid1().bytes, int(time() * 1000))).digest() self._writeFileHeader() @@ -272,6 +280,7 @@ def _checkAndWriteSync(self): if self._stream.getPos() >= (self._last_sync + SYNC_INTERVAL): self.sync() + class Reader(object): def __init__(self, path, start=0, length=0): self._block_compressed = False @@ -298,7 +307,7 @@ def getCompressionCodec(self): def getKeyClass(self): if not self._key_class: - self._key_class = hadoopClassFromName(self._key_class_name) + self._key_class = hadoopClassFromName(self._key_class_name) return self._key_class def getKeyClassName(self): @@ -306,7 +315,7 @@ def getKeyClassName(self): def getValueClass(self): if not self._value_class: - self._value_class = hadoopClassFromName(self._value_class_name) + self._value_class = hadoopClassFromName(self._value_class_name) return self._value_class def getValueClassName(self): @@ -353,6 +362,9 @@ def nextRawKey(self): raise IOError("File is corrupt") self._sync_seen = True + if self._stream.getPos() >= self._end: + return None + def _readBuffer(): length = readVInt(self._stream) buf = self._stream.read(length) @@ -374,7 +386,7 @@ def _readBuffer(): def nextKey(self, key): buf = self.nextRawKey() if not buf: - return False + return False key.readFields(buf) return True @@ -450,18 +462,18 @@ def _initialize(self, path, start, length): version_block[0:len(VERSION_PREFIX)]) self._version = version_block[len(VERSION_PREFIX)] - if self._version > VERSION[len(VERSION_PREFIX)]: - raise VersionMismatchException(VERSION[len(VERSION_PREFIX)], + if self._version > VERSION_WITH_METADATA: + raise VersionMismatchException(VERSION_WITH_METADATA, self._version) if self._version < BLOCK_COMPRESS_VERSION: - # Same as below, but with UTF8 Deprecated Class + # Same as below, but with UTF-8 Deprecated Class raise NotImplementedError else: self._key_class_name = Text.readString(self._stream) self._value_class_name = Text.readString(self._stream) - if ord(self._version) > 2: + if self._version > 2: self._decompress = self._stream.readBoolean() else: self._decompress = False diff --git a/hadoop/io/SetFile.py b/hadoop/io/SetFile.py index bf48886..391273a 100644 --- a/hadoop/io/SetFile.py +++ b/hadoop/io/SetFile.py @@ -16,8 +16,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from NullWritable import NullWritable -import MapFile +from .NullWritable import NullWritable +from . import MapFile + class Writer(MapFile.Writer): def __init__(self, path, key_class): @@ -26,6 +27,7 @@ def __init__(self, path, key_class): def append(self, key): return super(Writer, self).append(key, NullWritable()) + class Reader(MapFile.Reader): def next(self, key): return super(Reader, self).next(key, NullWritable()) diff --git a/hadoop/io/Text.py b/hadoop/io/Text.py index df2b580..45a59c3 100644 --- a/hadoop/io/Text.py +++ b/hadoop/io/Text.py @@ -16,8 +16,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from Writable import WritableComparable -from WritableUtils import readVInt, writeVInt +from .Writable import WritableComparable +from .WritableUtils import readVInt, writeVInt + class Text(WritableComparable): def __init__(self): @@ -62,20 +63,19 @@ def toString(self): @staticmethod def readString(data_input): length = readVInt(data_input) - bytes = data_input.read(length) - return Text.decode(bytes) + return Text.decode(data_input.read(length)) @staticmethod - def writeString(data_output, bytes): - bytes = Text.encode(bytes) - writeVInt(data_output, len(bytes)) - data_output.write(bytes) + def writeString(data_output, data): + data = Text.encode(data) + writeVInt(data_output, len(data)) + data_output.write(data) @staticmethod - def encode(bytes): - return bytes.encode('utf-8') + def encode(data): + return data.encode('utf-8') @staticmethod - def decode(bytes): - return bytes.decode('utf-8') + def decode(data): + return data.decode('utf-8') diff --git a/hadoop/io/VersionMismatchException.py b/hadoop/io/VersionMismatchException.py index 382660b..8edd1c1 100644 --- a/hadoop/io/VersionMismatchException.py +++ b/hadoop/io/VersionMismatchException.py @@ -15,6 +15,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + class VersionPrefixException(Exception): def __init__(self, expected, discovered): self.expected_prefix = expected @@ -23,13 +25,14 @@ def __str__(self): return "Sequence file prefix found %r but expected %r" \ % (self.discovered_prefix, self.expected_prefix) + class VersionMismatchException(Exception): def __init__(self, expected_version, founded_version): self.expected_version = expected_version self.founded_version = founded_version def toString(self): - return "A record version mismatch occured. Expecting %r, found %r" \ + return "A record version mismatch occurred. Expecting %r, found %r" \ % (self.expected_version, self.founded_version) def __str__(self): diff --git a/hadoop/io/Writable.py b/hadoop/io/Writable.py index b5f0458..c68cd16 100644 --- a/hadoop/io/Writable.py +++ b/hadoop/io/Writable.py @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + class Writable(object): def write(self, data_output): raise NotImplementedError @@ -29,10 +30,12 @@ def toString(self): def __repr__(self): return self.toString() + class WritableComparable(Writable): def compareTo(self, other): raise NotImplementedError + class AbstractValueWritable(WritableComparable): def __init__(self, value=None): assert not isinstance(value, type(self)), (type(self._value)) diff --git a/hadoop/io/WritableUtils.py b/hadoop/io/WritableUtils.py index 6153339..a04e89d 100644 --- a/hadoop/io/WritableUtils.py +++ b/hadoop/io/WritableUtils.py @@ -16,6 +16,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six + + +def intToByte(version): + if six.PY2: + return chr(version % 256) + else: + return version.to_bytes(1, 'little', signed=True) + def readVInt(data_input): return readVLong(data_input) @@ -26,7 +35,7 @@ def readVLong(data_input): return first_byte i = 0 - for idx in xrange(length - 1): + for _idx in range(length - 1): b = data_input.readUByte() i = i << 8 i = i | b diff --git a/hadoop/io/__init__.py b/hadoop/io/__init__.py index c73675d..92374f8 100644 --- a/hadoop/io/__init__.py +++ b/hadoop/io/__init__.py @@ -15,19 +15,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Hadoop I/O Python Classes""" -import OutputStream -import InputStream +from . import OutputStream +from . import InputStream -import SequenceFile -import ArrayFile -import MapFile -import SetFile +from . import SequenceFile +from . import ArrayFile +from . import MapFile +from . import SetFile -from Writable import * -from IntWritable import * -from Text import * -from BytesWritable import * -import WritableUtils +from .Writable import * +from .IntWritable import * +from .BytesWritable import * +from .Text import * +from .NullWritable import * +from . import WritableUtils -import compress +from . import compress diff --git a/hadoop/io/compress/CodecPool.py b/hadoop/io/compress/CodecPool.py index 7608cb0..a923cc7 100644 --- a/hadoop/io/compress/CodecPool.py +++ b/hadoop/io/compress/CodecPool.py @@ -18,9 +18,9 @@ from hadoop.util import ReflectionUtils -from BZip2Codec import * -from ZlibCodec import * -from GzipCodec import * +from .BZip2Codec import * +from .ZlibCodec import * +from .GzipCodec import * class CodecPool(object): def __new__(cls, *p, **k): diff --git a/hadoop/io/compress/GzipCodec.py b/hadoop/io/compress/GzipCodec.py index 5098983..2308060 100644 --- a/hadoop/io/compress/GzipCodec.py +++ b/hadoop/io/compress/GzipCodec.py @@ -19,19 +19,19 @@ import gzip from hadoop.io.InputStream import DataInputBuffer -import StringIO +import io class GzipCodec: def compress(self, data): - ioObj = StringIO.StringIO() - f = gzip.GzipFile(fileobj = ioObj, mode='wb') + ioObj = io.StringIO() + f = gzip.GzipFile(fileobj=ioObj, mode='wb') f.write(data) f.close() return ioObj.getValue() def decompress(self, data): - ioObj = StringIO.StringIO(data) - f = gzip.GzipFile(fileobj = ioObj, mode='rb') + ioObj = io.StringIO(data) + f = gzip.GzipFile(fileobj=ioObj, mode='rb') d = f.read() f.close() return d diff --git a/hadoop/io/compress/__init__.py b/hadoop/io/compress/__init__.py index 1d0f940..597ab72 100644 --- a/hadoop/io/compress/__init__.py +++ b/hadoop/io/compress/__init__.py @@ -15,6 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Hadoop I/O Compression Codecs""" -from CodecPool import * +from .CodecPool import * diff --git a/hadoop/pydoop/reader.py b/hadoop/pydoop/reader.py index cd861c3..9753b36 100644 --- a/hadoop/pydoop/reader.py +++ b/hadoop/pydoop/reader.py @@ -68,7 +68,7 @@ def __init__(self, context): def close(self): self.seq_file.close() - def next(self): + def __next__(self): if (self.seq_file.next(self._key, self._value)): return (True, self._key.toString(), self._value.toString()) else: diff --git a/hadoop/util/ReflectionUtils.py b/hadoop/util/ReflectionUtils.py index 32bf3d1..3a7cf9a 100644 --- a/hadoop/util/ReflectionUtils.py +++ b/hadoop/util/ReflectionUtils.py @@ -16,8 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from hadoop.io import * - def hadoopClassFromName(class_path): if class_path.startswith('org.apache.hadoop.'): class_path = class_path[11:] @@ -41,6 +39,6 @@ def classFromName(class_path): if not module_name: raise ValueError('Class name must contain module part.') - module = __import__(module_name, globals(), locals(), [str(class_name)], -1) + module = __import__(module_name, globals(), locals(), [str(class_name)], 0) return getattr(module, class_name) diff --git a/hadoop/util/__init__.py b/hadoop/util/__init__.py index 7c0ac5f..403ce42 100644 --- a/hadoop/util/__init__.py +++ b/hadoop/util/__init__.py @@ -15,6 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Utilities""" -from ReflectionUtils import * +from .ReflectionUtils import * diff --git a/setup.py b/setup.py index da8a68c..4088584 100644 --- a/setup.py +++ b/setup.py @@ -2,17 +2,20 @@ from setuptools import setup -setup(name='Hadoop', - version='0.1.4', +setup(name='hadoop', + version='0.2.0', description='Python Hadoop I/O Utilities', license="Apache Software License 2.0 (ASF)", author='Matteo Bertozzi', author_email='theo.bertozzi@gmail.com', - url='http://hadoop.apache.org', + maintainer='Sebastian Nagel', + maintainer_email='sebastian@commoncrawl.org', + url='https://github.com/commoncrawl/python-hadoop', packages=["hadoop", 'hadoop.util', 'hadoop.io', 'hadoop.io.compress', "hadoop.pydoop"], + require=['lzraw>=0.0.1'], extras_require = { - 'pydoop': ['pydoop>=0.9.1'] - } + 'pydoop': ['pydoop>=2.0.0'] + } )