31 from daal
import step1Local, step2Master, step3Master
32 import daal.algorithms.kmeans
as kmeans
33 import daal.algorithms.kmeans.init
as init
34 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
35 from daal.data_management
import RowMergedNumericTable
37 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
38 if utils_folder
not in sys.path:
39 sys.path.insert(0, utils_folder)
40 from utils
import printNumericTable, createSparseTable
43 algorithmFPType = np.float32
47 nVectorsInBlock = 8000
49 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
50 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_csr_1.csv'),
51 os.path.join(DAAL_PREFIX,
'kmeans_csr_2.csv'),
52 os.path.join(DAAL_PREFIX,
'kmeans_csr_3.csv'),
53 os.path.join(DAAL_PREFIX,
'kmeans_csr_4.csv')]
57 return [createSparseTable(f, ntype=np.float32)
for f
in files]
60 def initStep1(data, method):
61 for i
in range(nBlocks):
63 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
64 fptype=algorithmFPType, method=method)
65 local.input.set(kmeans.init.data, data[i])
66 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
72 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
74 for i
in range(len(data)):
75 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
76 step2.input.set(kmeans.init.data, data[i])
77 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
78 if not bFirstIteration:
79 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
82 localNodeData.append(res.getLocal(kmeans.init.internalResult))
83 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
84 return step3.compute()
87 def initStep4(data, localNodeData, step3res, method):
89 for i
in range(0, len(data)):
91 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
96 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
98 step4.input.setInput(kmeans.init.data, data[i])
99 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
100 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
103 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
110 pMerged = RowMergedNumericTable()
112 pMerged.addNumericTable(r)
117 def initCentroids_plusPlusCSR(data):
121 pCentroids = RowMergedNumericTable()
123 pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
124 pCentroids.addNumericTable(pNewCentroids)
127 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
128 for iCenter
in range(1, nClusters):
130 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
132 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
133 pCentroids.addNumericTable(pNewCentroids)
137 def initCentroids_parallelPlusCSR(data):
141 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
144 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
145 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
147 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
148 for iRound
in range(step5.parameter.nRounds):
150 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
152 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
153 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
156 for i
in range(nBlocks):
158 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
159 local.parameter.outputForStep5Required =
True
161 local.input.setInput(kmeans.init.data, data[i])
162 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
163 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
165 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
167 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
169 return step5.finalizeCompute().get(kmeans.init.centroids)
172 def initCentroids(data, method):
173 if method == kmeans.init.parallelPlusCSR:
174 return initCentroids_parallelPlusCSR(data)
175 if method == kmeans.init.plusPlusCSR:
176 return initCentroids_plusPlusCSR(data)
177 assert False,
"Unknown method for initCentroids"
180 def calculateCentroids(initialCentroids, data):
181 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
183 nRows = initialCentroids.getNumberOfRows()
184 nCols = initialCentroids.getNumberOfColumns()
187 centroids = initialCentroids
188 objectiveFunction =
None
191 for it
in range(nIterations):
192 for i
in range(nBlocks):
194 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
197 localAlgorithm.input.set(kmeans.data, data[i])
198 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
200 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
202 masterAlgorithm.compute()
203 res = masterAlgorithm.finalizeCompute()
205 centroids = res.get(kmeans.centroids)
206 objectiveFunction = res.get(kmeans.objectiveFunction)
209 for i
in range(nBlocks):
211 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
214 localAlgorithm.input.set(kmeans.data, data[i])
215 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
217 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
220 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
221 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
222 printNumericTable(objectiveFunction,
"Objective function value:")
225 def runKMeans(data, method, methodName):
226 print(
"K-means init parameters: method = " + str(methodName))
227 centroids = initCentroids(data, method=method)
228 calculateCentroids(centroids, data)
231 if __name__ ==
"__main__":
232 data = loadData(dataFileNames)
233 runKMeans(data, kmeans.init.plusPlusCSR,
"plusPlusCSR")
234 runKMeans(data, kmeans.init.parallelPlusCSR,
"parallelPlusCSR")