31 from daal
import step1Local, step2Master, step3Master
32 import daal.algorithms.kmeans
as kmeans
33 import daal.algorithms.kmeans.init
as init
34 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
35 from daal.data_management
import FileDataSource, DataSource, RowMergedNumericTable
37 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
38 if utils_folder
not in sys.path:
39 sys.path.insert(0, utils_folder)
40 from utils
import printNumericTable, createSparseTable
43 algorithmFPType = np.float32
47 nVectorsInBlock = 2500
49 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
50 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_dense_1.csv'),
51 os.path.join(DAAL_PREFIX,
'kmeans_dense_2.csv'),
52 os.path.join(DAAL_PREFIX,
'kmeans_dense_3.csv'),
53 os.path.join(DAAL_PREFIX,
'kmeans_dense_4.csv')]
58 for i
in range(nBlocks):
60 dataSource = FileDataSource(files[i],
61 DataSource.doAllocateNumericTable,
62 DataSource.doDictionaryFromContext)
64 dataSource.loadDataBlock()
65 data.append(dataSource.getNumericTable())
69 def initStep1(data, method):
70 for i
in range(nBlocks):
72 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
73 fptype=algorithmFPType, method=method)
74 local.input.set(kmeans.init.data, data[i])
75 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
81 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
83 for i
in range(len(data)):
84 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
85 step2.input.set(kmeans.init.data, data[i])
86 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
87 if not bFirstIteration:
88 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
91 localNodeData.append(res.getLocal(kmeans.init.internalResult))
92 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
93 return step3.compute()
96 def initStep4(data, localNodeData, step3res, method):
98 for i
in range(0, len(data)):
100 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
105 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
107 step4.input.setInput(kmeans.init.data, data[i])
108 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
109 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
112 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
119 pMerged = RowMergedNumericTable()
121 pMerged.addNumericTable(r)
126 def initCentroids_plusPlusDense(data):
130 pCentroids = RowMergedNumericTable()
132 pNewCentroids = initStep1(data, kmeans.init.plusPlusDense)
133 pCentroids.addNumericTable(pNewCentroids)
136 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusDense)
137 for iCenter
in range(1, nClusters):
139 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusDense)
141 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusDense)
142 pCentroids.addNumericTable(pNewCentroids)
146 def initCentroids_parallelPlusDense(data):
150 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusDense)
153 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
154 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
156 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
157 for iRound
in range(step5.parameter.nRounds):
159 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusDense)
161 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusDense)
162 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
165 for i
in range(nBlocks):
167 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
168 local.parameter.outputForStep5Required =
True
170 local.input.setInput(kmeans.init.data, data[i])
171 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
172 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
174 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
176 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
178 return step5.finalizeCompute().get(kmeans.init.centroids)
181 def initCentroids(data, method):
182 if method == kmeans.init.parallelPlusDense:
183 return initCentroids_parallelPlusDense(data)
184 if method == kmeans.init.plusPlusDense:
185 return initCentroids_plusPlusDense(data)
186 assert False,
"Unknown method for initCentroids"
189 def calculateCentroids(initialCentroids, data):
190 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType)
192 nRows = initialCentroids.getNumberOfRows()
193 nCols = initialCentroids.getNumberOfColumns()
196 centroids = initialCentroids
197 objectiveFunction =
None
200 for it
in range(nIterations):
201 for i
in range(nBlocks):
203 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType)
206 localAlgorithm.input.set(kmeans.data, data[i])
207 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
209 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
211 masterAlgorithm.compute()
212 res = masterAlgorithm.finalizeCompute()
214 centroids = res.get(kmeans.centroids)
215 objectiveFunction = res.get(kmeans.objectiveFunction)
218 for i
in range(nBlocks):
220 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
223 localAlgorithm.input.set(kmeans.data, data[i])
224 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
226 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
229 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
230 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
231 printNumericTable(objectiveFunction,
"Objective function value:")
234 def runKMeans(data, method, methodName):
235 print(
"K-means init parameters: method = " + str(methodName))
236 centroids = initCentroids(data, method=method)
237 calculateCentroids(centroids, data)
240 if __name__ ==
"__main__":
241 data = loadData(dataFileNames)
242 runKMeans(data, kmeans.init.plusPlusDense,
"plusPlusDense")
243 runKMeans(data, kmeans.init.parallelPlusDense,
"parallelPlusDense")