Python* API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

kmeans_init_csr_distr.py

1 # file: kmeans_init_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2020 Intel Corporation
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #===============================================================================
17 
18 #
19 # ! Content:
20 # ! Python example of CSR K-Means clustering in the distributed processing mode
21 # !*****************************************************************************
22 
23 #
24 
25 
26 #
27 
28 import sys, os
29 import numpy as np
30 
31 from daal import step1Local, step2Master, step3Master
32 import daal.algorithms.kmeans as kmeans
33 import daal.algorithms.kmeans.init as init
34 from daal import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
35 from daal.data_management import RowMergedNumericTable
36 
37 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
38 if utils_folder not in sys.path:
39  sys.path.insert(0, utils_folder)
40 from utils import printNumericTable, createSparseTable
41 
42 # K-Means algorithm parameters
43 algorithmFPType = np.float32
44 nClusters = 20
45 nIterations = 5
46 nBlocks = 4
47 nVectorsInBlock = 8000
48 
49 DAAL_PREFIX = os.path.join('..', 'data', 'distributed')
50 dataFileNames = [os.path.join(DAAL_PREFIX, 'kmeans_csr_1.csv'),
51  os.path.join(DAAL_PREFIX, 'kmeans_csr_2.csv'),
52  os.path.join(DAAL_PREFIX, 'kmeans_csr_3.csv'),
53  os.path.join(DAAL_PREFIX, 'kmeans_csr_4.csv')]
54 
55 
56 def loadData(files):
57  return [createSparseTable(f, ntype=np.float32) for f in files]
58 
59 
60 def initStep1(data, method):
61  for i in range(nBlocks):
62  # Create an algorithm object for the K-Means algorithm
63  local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
64  fptype=algorithmFPType, method=method)
65  local.input.set(kmeans.init.data, data[i])
66  pNewCenters = local.compute().get(kmeans.init.partialCentroids)
67  if pNewCenters:
68  return pNewCenters
69  return None
70 
71 
72 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
73 # kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
74  for i in range(len(data)):
75  step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
76  step2.input.set(kmeans.init.data, data[i])
77  step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
78  if not bFirstIteration:
79  step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
80  res = step2.compute()
81  if bFirstIteration:
82  localNodeData.append(res.getLocal(kmeans.init.internalResult))
83  step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
84  return step3.compute()
85 
86 
87 def initStep4(data, localNodeData, step3res, method):
88  aRes = []
89  for i in range(0, len(data)):
90  # Get an input for step 4 on this node if any
91  step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
92  if not step3Output:
93  continue
94 
95  # Create an algorithm object for the step 4
96  step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
97  # Set the input data to the algorithm
98  step4.input.setInput(kmeans.init.data, data[i])
99  step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
100  step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
101  # Compute and get the result
102  step4.compute()
103  aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
104 
105  if len(aRes) == 0:
106  return None
107  if len(aRes) == 1:
108  return aRes[0]
109  # For parallelPlus algorithm
110  pMerged = RowMergedNumericTable()
111  for r in aRes:
112  pMerged.addNumericTable(r)
113  return pMerged
114 # return NumericTable.cast(pMerged)
115 
116 
117 def initCentroids_plusPlusCSR(data):
118  # Internal data to be stored on the local nodes
119  localNodeData = []
120  # Numeric table to collect the results
121  pCentroids = RowMergedNumericTable()
122  # First step on the local nodes
123  pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
124  pCentroids.addNumericTable(pNewCentroids)
125 
126  # Create an algorithm object for the step 3
127  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
128  for iCenter in range(1, nClusters):
129  # Perform steps 2 and 3
130  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
131  # Perform steps 4
132  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
133  pCentroids.addNumericTable(pNewCentroids)
134  return pCentroids #NumericTable.cast(pCentroids)
135 
136 
137 def initCentroids_parallelPlusCSR(data):
138  # Internal data to be stored on the local nodes
139  localNodeData = []
140  # First step on the local nodes
141  pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
142 
143  # Create an algorithm object for the step 5
144  step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
145  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
146  # Create an algorithm object for the step 3
147  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
148  for iRound in range(step5.parameter.nRounds):
149  # Perform steps 2 and 3
150  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
151  # Perform step 4
152  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
153  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
154 
155  # One more step 2
156  for i in range(nBlocks):
157  # Create an algorithm object for the step 2
158  local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
159  local.parameter.outputForStep5Required = True
160  # Set the input data to the algorithm
161  local.input.setInput(kmeans.init.data, data[i])
162  local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
163  local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
164  # Compute, get the result and add the result to the input of step 5
165  step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
166 
167  step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
168  step5.compute()
169  return step5.finalizeCompute().get(kmeans.init.centroids)
170 
171 
172 def initCentroids(data, method):
173  if method == kmeans.init.parallelPlusCSR:
174  return initCentroids_parallelPlusCSR(data)
175  if method == kmeans.init.plusPlusCSR:
176  return initCentroids_plusPlusCSR(data)
177  assert False, "Unknown method for initCentroids"
178 
179 
180 def calculateCentroids(initialCentroids, data):
181  masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
182 
183  nRows = initialCentroids.getNumberOfRows()
184  nCols = initialCentroids.getNumberOfColumns()
185 
186  assignments = []
187  centroids = initialCentroids
188  objectiveFunction = None
189 
190  # Calculate centroids
191  for it in range(nIterations):
192  for i in range(nBlocks):
193  # Create an algorithm object for the K-Means algorithm
194  localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
195 
196  # Set the input data to the algorithm
197  localAlgorithm.input.set(kmeans.data, data[i])
198  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
199 
200  masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
201 
202  masterAlgorithm.compute()
203  res = masterAlgorithm.finalizeCompute()
204 
205  centroids = res.get(kmeans.centroids)
206  objectiveFunction = res.get(kmeans.objectiveFunction)
207 
208  # Calculate assignments
209  for i in range(nBlocks):
210  # Create an algorithm object for the K-Means algorithm
211  localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
212 
213  # Set the input data to the algorithm
214  localAlgorithm.input.set(kmeans.data, data[i])
215  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
216 
217  assignments.append(localAlgorithm.compute().get(kmeans.assignments))
218 
219  # Print the clusterization results
220  printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
221  printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
222  printNumericTable(objectiveFunction, "Objective function value:")
223 
224 
225 def runKMeans(data, method, methodName):
226  print("K-means init parameters: method = " + str(methodName))
227  centroids = initCentroids(data, method=method)
228  calculateCentroids(centroids, data)
229 
230 
231 if __name__ == "__main__":
232  data = loadData(dataFileNames)
233  runKMeans(data, kmeans.init.plusPlusCSR, "plusPlusCSR")
234  runKMeans(data, kmeans.init.parallelPlusCSR, "parallelPlusCSR")

For more complete information about compiler optimizations, see our Optimization Notice.