Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

cluster.py 4.6 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  1. """
  2. Usage:
  3. cluster.py [options]
  4. Options:
  5. -T FILE
  6. Write transcript to FILE.
  7. --dry-run
  8. Compute clusters but don't save them.
  9. """
  10. import os
  11. import sys
  12. import gzip
  13. import threading
  14. from textwrap import dedent
  15. from functools import reduce
  16. from natural.number import number
  17. from psycopg2 import sql
  18. import hashlib
  19. from docopt import docopt
  20. import pandas as pd
  21. import numpy as np
  22. from graph_tool.all import label_components
  23. from bookdata import db, tracking, script_log
  24. from bookdata.graph import GraphLoader
  25. from bookdata.schema import *
  26. _log = script_log(__name__)
  27. def cluster_isbns(isbn_recs, edges):
  28. """
  29. Compute ISBN clusters.
  30. """
  31. _log.info('initializing isbn vector')
  32. isbns = isbn_recs.groupby('isbn_id').record.min()
  33. index = isbns.index
  34. clusters = isbns.values
  35. _log.info('mapping edge IDs')
  36. edges = edges.assign(left_ino=index.get_indexer(edges.left_isbn).astype('i4'))
  37. assert np.all(edges.left_ino >= 0)
  38. edges = edges.assign(right_ino=index.get_indexer(edges.right_isbn).astype('i4'))
  39. assert np.all(edges.right_ino >= 0)
  40. _log.info('clustering')
  41. iters = _make_clusters(clusters, edges.left_ino.values, edges.right_ino.values)
  42. isbns = isbns.reset_index(name='cluster')
  43. _log.info('produced %s clusters in %d iterations',
  44. number(isbns.cluster.nunique()), iters)
  45. return isbns.loc[:, ['isbn_id', 'cluster']]
  46. def _make_clusters(clusters, ls, rs):
  47. """
  48. Compute book clusters. The input is initial cluster assignments and the left and right
  49. indexes for co-occuring ISBN edges; these are ISBNs that have connections to the same
  50. record in the bipartite ISBN-record graph.
  51. Args:
  52. clusters(ndarray): the initial cluster assignments
  53. ls(ndarray): the indexes of the left hand side of edges
  54. rs(ndarray): the indexes of the right hand side of edges
  55. """
  56. iters = 0
  57. nchanged = len(ls)
  58. while nchanged > 0:
  59. iters = iters + 1
  60. cdf = pd.DataFrame({
  61. 'idx': rs,
  62. 'cluster': np.minimum(clusters[ls], clusters[rs])
  63. })
  64. c = cdf.groupby('idx')['cluster'].min()
  65. nchanged = np.sum(c.values != clusters[c.index])
  66. _log.info('iteration %d changed %d clusters', iters, nchanged)
  67. clusters[c.index] = c.values
  68. return iters
  69. def _import_clusters(dbc, frame):
  70. with dbc.cursor() as cur:
  71. _log.info('creating cluster table')
  72. cur.execute(sql.SQL('DROP TABLE IF EXISTS isbn_cluster CASCADE'))
  73. cur.execute(sql.SQL('''
  74. CREATE TABLE isbn_cluster (
  75. isbn_id INTEGER NOT NULL,
  76. cluster INTEGER NOT NULL
  77. )
  78. '''))
  79. _log.info('loading %d clusters into isbn_cluster', len(frame))
  80. db.save_table(dbc, sql.SQL('isbn_cluster'), frame)
  81. with dbc.cursor() as cur:
  82. cur.execute(sql.SQL('ALTER TABLE isbn_cluster ADD PRIMARY KEY (isbn_id)'))
  83. cur.execute(sql.SQL('CREATE INDEX isbn_cluster_idx ON isbn_cluster (cluster)'))
  84. cur.execute(sql.SQL('ANALYZE isbn_cluster'))
  85. def _hash_frame(df):
  86. hash = hashlib.md5()
  87. for c in df.columns:
  88. hash.update(df[c].values.data)
  89. return hash.hexdigest()
  90. def cluster(txout, dry=False):
  91. "Cluster ISBNs"
  92. with db.connect() as dbc:
  93. tracking.begin_stage(dbc, 'cluster')
  94. with db.engine().connect() as cxn:
  95. _log.info('loading graph')
  96. gl = GraphLoader()
  97. g = gl.load_graph(cxn, False)
  98. print('NODES', g.num_vertices(), file=txout)
  99. print('EDGES', g.num_edges(), file=txout)
  100. _log.info('finding connected components')
  101. comps, hist = label_components(g)
  102. _log.info('found %d components, largest has %s items', len(hist), np.max(hist))
  103. print('COMPONENTS', len(hist), file=txout)
  104. is_isbn = g.vp.source.a == ns_isbn.code
  105. clusters = pd.DataFrame({
  106. 'isbn_id': g.vp.label.a[is_isbn],
  107. 'cluster': comps.a[is_isbn]
  108. })
  109. if not dry:
  110. _log.info('saving cluster records to database')
  111. with dbc:
  112. _import_clusters(dbc, clusters)
  113. _log.info('saving ID graph')
  114. g.vp['cluster'] = comps
  115. g.save('data/id-graph.gt')
  116. c_hash = _hash_frame(clusters)
  117. print('WRITE CLUSTERS', c_hash, file=txout)
  118. tracking.end_stage(dbc, 'cluster', c_hash)
  119. opts = docopt(__doc__)
  120. tx_fn = opts.get('-T', None)
  121. if tx_fn == '-' or not tx_fn:
  122. tx_out = sys.stdout
  123. else:
  124. _log.info('writing transcript to %s', tx_fn)
  125. tx_out = open(tx_fn, 'w')
  126. cluster(tx_out, opts['--dry-run'])
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...