-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathgrep.py
More file actions
53 lines (36 loc) · 1.59 KB
/
Copy pathgrep.py
File metadata and controls
53 lines (36 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from __future__ import absolute_import, division, print_function, unicode_literals
from pyspark.sql import types as SparkTypes
from cosrlib.spark import sql, SparkPlugin
class Words(SparkPlugin):
""" Finds documents containing a list of words in their indexable text (visible, non-boilerplate) """
def init(self):
if not self.args.get("words"):
raise Exception("grep.Words plugin needs words!")
self.words = frozenset(self.args["words"].split(" "))
def hook_spark_pipeline_init(self, sc, sqlc, schema, indexer):
schema.append(SparkTypes.StructField(
"grep_words",
SparkTypes.ArrayType(SparkTypes.StringType()),
nullable=True
))
def hook_document_post_index(self, document, metadata):
""" Filters a document post-indexing """
doc_words = document.get_all_words()
match = doc_words.intersection(self.words)
if len(match) > 0:
print("WORD MATCH", match, document.source_url.url)
metadata["grep_words"] = list(match)
def hook_spark_pipeline_action(self, sc, sqlc, df, indexer):
lines_df = sql(sqlc, """
SELECT CONCAT(CONCAT_WS(",", SORT_ARRAY(grep_words)), " ", url) r
FROM df
WHERE size(grep_words) > 0
""", {"df": df})
self.save_dataframe(lines_df, "text")
return True
# class TextRegex(SparkPlugin):
# """ Finds documents matching a regex on their visible text """
# pass
# class HTMLRegex(SparkPlugin):
# """ Finds documents matching a regex on their raw HTML """
# pass