Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions src/java/org/apache/nutch/crawl/Generator2.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,21 @@ public SelectorEntry() {
segnum = new IntWritable(0);
}

@Override
public void readFields(DataInput in) throws IOException {
url.readFields(in);
datum.readFields(in);
segnum.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
url.write(out);
datum.write(out);
segnum.write(out);
}

@Override
public String toString() {
return "url=" + url.toString() + ", datum=" + datum.toString()
+ ", segnum=" + segnum.toString();
Expand Down Expand Up @@ -342,6 +345,11 @@ public void setup(
filters = new URLFilters(conf);
scfilters = new ScoringFilters(conf);
filter = conf.getBoolean(GENERATOR_FILTER, true);
normalise = conf.getBoolean(GENERATOR_NORMALISE, true);
if (normalise) {
normalizers = new URLNormalizers(conf,
URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
}
genDelay = conf.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;
long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L);
if (time > 0)
Expand All @@ -357,6 +365,7 @@ public void setup(
}

/** Select & invert subset due for fetch. */
@Override
public void map(Text key, CrawlDatum value, Context context)
throws IOException, InterruptedException {
String urlString = key.toString();
Expand All @@ -365,12 +374,13 @@ public void map(Text key, CrawlDatum value, Context context)
// If filtering is on don't generate URLs that don't pass
// URLFilters
try {
if (filters.filter(urlString) == null)
if (filters.filter(urlString) == null) {
context.getCounter("Generator", "URL_FILTERS_REJECTED").increment(1);
return;
} catch (URLFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't filter url {}: {}", key, e.getMessage());
}
} catch (URLFilterException e) {
LOG.warn("Couldn't filter url {}: {}", key, e.getMessage());
context.getCounter("Generator", "URL_FILTER_EXCEPTION").increment(1);
}
}

Expand All @@ -394,9 +404,7 @@ public void map(Text key, CrawlDatum value, Context context)
try {
sort = scfilters.generatorSortValue(key, value, sort);
} catch (ScoringFilterException sfe) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't filter generatorSortValue for {}: {}", key, sfe);
}
LOG.warn("Couldn't filter generatorSortValue for {}: {}", key, sfe);
}

if (restrictStatus != null && !restrictStatus
Expand Down Expand Up @@ -675,6 +683,7 @@ private static Map<String, DomainLimits> readLimitsFile(Reader limitsReader,
* Limit the number of URLs per host/domain and assign segment number to
* every record.
*/
@Override
public void reduce(DomainScorePair key, Iterable<SelectorEntry> values,
Context context) throws IOException, InterruptedException {

Expand Down Expand Up @@ -971,6 +980,7 @@ public static class SegmenterMapper extends

SegmenterKey outputKey = new SegmenterKey();

@Override
public void map(FloatWritable key, SelectorEntry value, Context context)
throws IOException, InterruptedException {
outputKey.set(value.url, value.segnum);
Expand All @@ -996,6 +1006,7 @@ public void setup(Context context) {
mos = new MultipleOutputs<Text, SelectorEntry>(context);
}

@Override
public void reduce(SegmenterKey key, Iterable<SelectorEntry> values,
Context context) throws IOException, InterruptedException {
long count = 0;
Expand Down Expand Up @@ -1058,6 +1069,7 @@ public void setup(Context context) {
partitioner.setDomainLimits(SelectorReducer.readLimitsFile(conf, acceptor));
}

@Override
public void map(Text key, SelectorEntry value, Context context)
throws IOException, InterruptedException {
out.write("sequenceFilesPartitions", key, value.datum,
Expand All @@ -1084,6 +1096,7 @@ public HashComparator() {
super(Text.class);
}

@Override
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text url1 = (Text) a;
Expand All @@ -1093,6 +1106,7 @@ public int compare(WritableComparable a, WritableComparable b) {
return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
}

@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int hash1 = hash(b1, s1, l1);
int hash2 = hash(b2, s2, l2);
Expand All @@ -1104,7 +1118,7 @@ private static int hash(byte[] bytes, int start, int length) {
// make later bytes more significant in hash code, so that sorting
// by hashcode correlates less with by-host ordering.
for (int i = length - 1; i >= 0; i--)
hash = (31 * hash) + (int) bytes[start + i];
hash = (31 * hash) + bytes[start + i];
return hash;
}
}
Expand Down Expand Up @@ -1341,9 +1355,7 @@ public Path[] generate(Path dbDir, String dbVersion, Path segments,
*/
private List<Path> partitionSegments(FileSystem fs, Path segmentsDir,
List<Path> inputDirs, int numLists) throws Exception {
if (LOG.isInfoEnabled()) {
LOG.info("Generator: Partitioning selected urls for politeness.");
}
LOG.info("Generator: Partitioning selected urls for politeness.");

List<Path> generatedSegments = new ArrayList<Path>();

Expand Down Expand Up @@ -1420,6 +1432,7 @@ public static void main(String args[]) throws Exception {
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println(
Expand Down