Skip to content

Commit e1deaa7

Browse files
Pure python MapFile, SetFile, ArrayFile
1 parent d8d53a0 commit e1deaa7

8 files changed

Lines changed: 350 additions & 7 deletions

File tree

python-hadoop/ArrayFileTest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python
2+
3+
from io.IntWritable import LongWritable, IntWritable
4+
from io import ArrayFile
5+
6+
if __name__ == '__main__':
7+
writer = ArrayFile.Writer('array-test', IntWritable)
8+
writer.INDEX_INTERVAL = 16
9+
for i in xrange(0, 100):
10+
writer.append(IntWritable(1 + i * 10))
11+
writer.close()
12+
13+
key = LongWritable()
14+
value = IntWritable()
15+
reader = ArrayFile.Reader('array-test')
16+
while reader.next(key, value):
17+
print key, value
18+
19+
print 'GET 8'
20+
print reader.get(8, value)
21+
print value
22+
print
23+
24+
print 'GET 110'
25+
print reader.get(110, value)
26+
print
27+
28+
print 'GET 25'
29+
print reader.get(25, value)
30+
print value
31+
print
32+
33+
print 'GET 55'
34+
print reader.get(55, value)
35+
print value
36+
print
37+
38+
reader.close()

python-hadoop/MapFileTest.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env python
2+
3+
from io.IntWritable import LongWritable
4+
from io import MapFile
5+
6+
if __name__ == '__main__':
7+
writer = MapFile.Writer('map-test', LongWritable, LongWritable)
8+
writer.INDEX_INTERVAL = 2
9+
for i in xrange(0, 100, 2):
10+
writer.append(LongWritable(i), LongWritable(i * 10))
11+
writer.close()
12+
13+
key = LongWritable()
14+
value = LongWritable()
15+
reader = MapFile.Reader('map-test')
16+
while reader.next(key, value):
17+
print key, value
18+
19+
print 'GET CLOSEST'
20+
key.set(8)
21+
print reader.get(key, value)
22+
print value
23+
print
24+
25+
print 'GET 111'
26+
key.set(111)
27+
print reader.get(key, value)
28+
print
29+
30+
key.set(25)
31+
print 'SEEK 25 before'
32+
print reader.getClosest(key, value, before=True)
33+
print value
34+
print
35+
36+
key.set(55)
37+
print 'SEEK 55'
38+
print reader.getClosest(key, value)
39+
print value
40+
print
41+
42+
reader.close()

python-hadoop/SetFileTest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python
2+
3+
from io.IntWritable import IntWritable
4+
from io import SetFile
5+
6+
if __name__ == '__main__':
7+
writer = SetFile.Writer('set-test', IntWritable)
8+
writer.INDEX_INTERVAL = 16
9+
for i in xrange(0, 100, 2):
10+
writer.append(IntWritable(i * 10))
11+
writer.close()
12+
13+
key = IntWritable()
14+
reader = SetFile.Reader('set-test')
15+
while reader.next(key):
16+
print key
17+
18+
print 'GET 8'
19+
key.set(8)
20+
print reader.get(key)
21+
print
22+
23+
print 'GET 120'
24+
key.set(120)
25+
print reader.get(key)
26+
print
27+
28+
print 'GET 240'
29+
key.set(240)
30+
print reader.get(key)
31+
print
32+
33+
print 'GET 550'
34+
key.set(550)
35+
print reader.get(key)
36+
print
37+
38+
reader.close()

python-hadoop/io/ArrayFile.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/usr/bin/env python
2+
3+
from IntWritable import LongWritable
4+
import MapFile
5+
6+
class Writer(MapFile.Writer):
7+
def __init__(self, path, value_class):
8+
super(Writer, self).__init__(path, LongWritable, value_class)
9+
self._count = 0
10+
11+
def append(self, value):
12+
super(Writer, self).append(LongWritable(self._count), value)
13+
self._count += 1
14+
15+
class Reader(MapFile.Reader):
16+
def __init__(self, path):
17+
super(Reader, self).__init__(path)
18+
self._key = LongWritable(0)
19+
20+
def seek(self, n):
21+
if isinstance(n, LongWritable):
22+
n = n.get()
23+
24+
self._key.set(n)
25+
return super(Reader, self).seek(self._key)
26+
27+
def key(self):
28+
return self._key.get()
29+
30+
def get(self, n, value):
31+
self._key.set(n)
32+
return(super(Reader, self).get(self._key, value))
33+

python-hadoop/io/MapFile.py

Lines changed: 182 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,187 @@
11
#!/usr/bin/env python
22

3+
import os
4+
5+
from IntWritable import LongWritable
6+
import SequenceFile
7+
8+
INDEX_FILE_NAME = 'index'
9+
DATA_FILE_NAME = 'data'
10+
311
class Writer(object):
4-
pass
12+
INDEX_INTERVAL = 128
13+
14+
def __init__(self, dirname, key_class, value_class):
15+
os.mkdir(dirname)
16+
17+
data_path = os.path.join(dirname, DATA_FILE_NAME)
18+
self._data = SequenceFile.createWriter(data_path, key_class, value_class)
19+
20+
index_path = os.path.join(dirname, INDEX_FILE_NAME)
21+
self._index = SequenceFile.createBlockWriter(index_path, key_class, LongWritable)
22+
23+
self._size = 0
24+
self._last_index_pos = -1
25+
self._last_index_nkeys = -4294967295
26+
27+
def close(self):
28+
self._data.close()
29+
self._index.close()
30+
31+
def append(self, key, value):
32+
self._checkKey(key)
33+
34+
pos = self._data.getLength()
35+
if self._size >= self._last_index_nkeys + self.INDEX_INTERVAL and pos > self._last_index_pos:
36+
self._index.append(key, LongWritable(pos))
37+
self._last_index_pos = pos
38+
self._last_index_nkeys = self._size
39+
40+
self._data.append(key, value)
41+
self._size += 1
42+
43+
def _checkKey(self, key):
44+
pass
545

646
class Reader(object):
7-
pass
47+
INDEX_SKIP = 0
48+
49+
def __init__(self, dirname):
50+
self._data = SequenceFile.Reader(os.path.join(dirname, DATA_FILE_NAME))
51+
self._index = SequenceFile.Reader(os.path.join(dirname, INDEX_FILE_NAME))
52+
self._first_position = self._data.getPosition()
53+
self._positions = []
54+
self._keys = []
55+
56+
def close(self):
57+
self._data.close()
58+
self._index.close()
59+
60+
def getIndexInterval(self):
61+
return self._index_interval
62+
63+
def setIndexInterval(self, interval):
64+
self._index_interval = interval
65+
66+
def reset(self):
67+
self._data.seek(self._first_position)
68+
69+
def midKey(self):
70+
self._readIndex()
71+
count = len(self._keys)
72+
if count == 0:
73+
return None
74+
return keys[(count - 1) >> 1]
75+
76+
def finalKey(self, key):
77+
original_position = self._data.getPosition()
78+
try:
79+
self._readIndex()
80+
count = len(self._keys)
81+
if count > 0:
82+
self._data.seek(self._positions[count - 1])
83+
else:
84+
self._reset()
85+
while self._data.nextKey(key):
86+
continue
87+
finally:
88+
self._data.seek(original_position)
89+
90+
def seek(self, key):
91+
return self._seekInternal(key) == 0
92+
93+
def next(self, key, value):
94+
return self._data.next(key, value)
95+
96+
def get(self, key, value):
97+
if self.seek(key):
98+
self._data._getCurrentValue(value)
99+
return value
100+
return None
101+
102+
def getClosest(self, key, value, before=False):
103+
c = self._seekInternal(key, before)
104+
if (not before and c > 0) or (before and c < 0):
105+
return None
106+
107+
self._data._getCurrentValue(value)
108+
return self._next_key
109+
110+
def _readIndex(self):
111+
if self._keys:
112+
return
113+
114+
key_class = self._index.getKeyClass()
115+
116+
skip = self.INDEX_SKIP
117+
position = LongWritable()
118+
last_position = None
119+
while True:
120+
key = key_class()
121+
if not self._index.next(key, position):
122+
break
123+
124+
if skip > 0:
125+
skip -= 1
126+
continue
127+
128+
skip = self.INDEX_SKIP
129+
if position.get() == last_position:
130+
continue
131+
132+
self._positions.append(position.get())
133+
self._keys.append(key)
134+
135+
def _seekInternal(self, key, before=None):
136+
self._readIndex()
137+
138+
seek_index = self._indexSearch(key)
139+
if seek_index < 0:
140+
seek_index = -seek_index - 2
141+
142+
if seek_index == -1:
143+
seek_position = self._first_position
144+
else:
145+
seek_position = self._positions[seek_index]
146+
147+
prev_position = -1
148+
curr_position = seek_position
149+
150+
key_class = self._data.getKeyClass()
151+
self._next_key = key_class()
152+
153+
self._data.seek(seek_position)
154+
while self._data.nextKey(self._next_key):
155+
cmp = key.compareTo(self._next_key)
156+
if cmp <= 0:
157+
if before and cmp != 0:
158+
if prev_position == -1:
159+
self._data.seek(curr_position)
160+
else:
161+
self._data.seek(prev_position)
162+
self._data.nextKey(self._next_key)
163+
return 1
164+
return cmp
165+
166+
if before:
167+
prev_position = curr_position
168+
curr_position = self._data.getPosition()
169+
170+
return 1
171+
172+
def _indexSearch(self, key):
173+
high = len(self._keys) - 1
174+
low = 0
175+
176+
while low <= high:
177+
mid = (low + high) >> 1
178+
179+
cmp = self._keys[mid].compareTo(key)
180+
if cmp < 0:
181+
low = mid + 1
182+
elif cmp > 0:
183+
high = mid - 1
184+
else:
185+
return mid
186+
return -(low + 1)
187+

python-hadoop/io/SetFile.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#!/usr/bin/env python
22

3-
import NullWritable
3+
from NullWritable import NullWritable
44
import MapFile
55

66
class Writer(MapFile.Writer):
7+
def __init__(self, path, key_class):
8+
super(Writer, self).__init__(path, key_class, NullWritable)
9+
710
def append(self, key):
811
return super(Writer, self).append(key, NullWritable())
912

@@ -13,7 +16,6 @@ def next(self, key):
1316

1417
def get(self, key):
1518
if self.seek(key):
16-
self.next(key)
17-
return key
19+
return self._next_key
1820
return None
1921

0 commit comments

Comments
 (0)