Python* API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

impl_als_csr_distr.py

1 # file: impl_als_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2020 Intel Corporation
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #===============================================================================
17 
18 
19 
20 
21 import os
22 import sys
23 
24 import numpy as np
25 
26 from daal import step1Local, step2Local, step2Master, step3Local, step4Local
27 import daal.algorithms.implicit_als.prediction.ratings as ratings
28 import daal.algorithms.implicit_als.training as training
29 import daal.algorithms.implicit_als.training.init as init
30 from daal.data_management import KeyValueDataCollection, HomogenNumericTable
31 
32 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
33 if utils_folder not in sys.path:
34  sys.path.insert(0, utils_folder)
35 from utils import createSparseTable, printALSRatings
36 
37 DAAL_PREFIX = os.path.join('..', 'data')
38 
39 # Input data set parameters
40 nBlocks = 4
41 
42 # Number of observations in transposed training data set blocks
43 trainDatasetFileNames = [
44  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_1.csv'),
45  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_2.csv'),
46  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_3.csv'),
47  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_4.csv')
48 ]
49 
50 usersPartition = [0] * 1
51 usersPartition[0] = nBlocks
52 
53 userOffsets = [0] * nBlocks
54 itemOffsets = [0] * nBlocks
55 
56 # Algorithm parameters
57 nUsers = 46 # Full number of users
58 nFactors = 2 # Number of factors
59 maxIterations = 5 # Number of iterations in the implicit ALS training algorithm
60 
61 dataTable = [0] * nBlocks
62 transposedDataTable = [0] * nBlocks
63 
64 predictedRatings = [[0] * nBlocks for x in range(nBlocks)]
65 
66 userStep3LocalInput = [0] * nBlocks
67 itemStep3LocalInput = [0] * nBlocks
68 
69 itemsPartialResultLocal = [0] * nBlocks
70 usersPartialResultLocal = [0] * nBlocks
71 
72 def readData(block):
73  global dataTable
74 
75  # Read trainDatasetFileName from a file and create a numeric table to store the input data
76  dataTable[block] = createSparseTable(trainDatasetFileNames[block])
77 
78 
79 def initializeStep1Local(block):
80  global itemsPartialResultLocal
81  global itemStep3LocalInput
82  global userOffsets
83 
84  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
85  initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
86  initAlgorithm.parameter.fullNUsers = nUsers
87  initAlgorithm.parameter.nFactors = nFactors
88  initAlgorithm.parameter.seed += block
89  usersPartitionArray = np.array(usersPartition, dtype=np.float64)
90  usersPartitionArray.shape = (1, 1)
91 
92  initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
93 
94  # Pass a training data set to the algorithm
95  initAlgorithm.input.set(init.data, dataTable[block])
96 
97  # Initialize the implicit ALS model
98  partialResult = initAlgorithm.compute()
99  itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
100  userOffsets[block] = partialResult.getCollection(init.offsets, block)
101  partialModelLocal = partialResult.getPartialModel(init.partialModel)
102 
103  itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
104  itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
105 
106  return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
107 
108 def initializeStep2Local(block, initStep2LocalInput):
109  global transposedDataTable
110  global userStep3LocalInput
111  global itemOffsets
112  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
113  initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
114 
115  initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
116 
117  # Initialize the implicit ALS model
118  partialResult = initAlgorithm.compute()
119 
120  transposedDataTable[block] = partialResult.getTable(init.transposedData)
121  userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
122  itemOffsets[block] = partialResult.getCollection(init.offsets, block)
123 
124 def initializeModel():
125  initStep1LocalResult = [0] * nBlocks
126 
127  for i in range(nBlocks):
128  initStep1LocalResult[i] = initializeStep1Local(i)
129 
130  initStep2LocalInput = [0] * nBlocks
131 
132  for i in range(nBlocks):
133  initStep2LocalInput[i] = KeyValueDataCollection()
134  for j in range(nBlocks):
135  initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
136 
137  for i in range(nBlocks):
138  initializeStep2Local(i, initStep2LocalInput[i])
139 
140 
141 def computeStep1Local(partialResultLocal):
142 
143  # Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
144  algorithm = training.Distributed(step=step1Local)
145  algorithm.parameter.nFactors = nFactors
146 
147  # Set input objects for the algorithm
148  algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
149 
150  # Compute partial results of the first step on local nodes
151  # DistributedPartialResultStep1 class from training
152  return algorithm.compute()
153 
154 
155 def computeStep2Master(step1LocalResult):
156 
157  # Create an algorithm object to perform second step of the implicit ALS training algorithm
158  algorithm = training.Distributed(step=step2Master)
159  algorithm.parameter.nFactors = nFactors
160 
161  # Set the partial results of the first local step of distributed computations
162  # as input for the master-node algorithm
163  for i in range(nBlocks):
164  algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
165 
166  # Compute a partial result on the master node from the partial results on local nodes
167  # DistributedPartialResultStep2 class from training
168  res = algorithm.compute()
169  return res.get(training.outputOfStep2ForStep4)
170 
171 
172 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
173 
174  # Create an algorithm object to perform third step of the implicit ALS training algorithm on local-node data
175  algorithm = training.Distributed(step=step3Local)
176  algorithm.parameter.nFactors = nFactors
177 
178  # Set input objects for the algorithm
179  algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
180  algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
181  algorithm.input.setTable(training.offset, offsets)
182 
183  # Compute partial results of the third step on local nodes
184  # DistributedPartialResultStep3 class from training
185  res = algorithm.compute()
186  return res.get(training.outputOfStep3ForStep4)
187 
188 
189 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
190 
191  # Create an algorithm object to perform fourth step of the implicit ALS training algorithm on local-node data
192  algorithm = training.Distributed(step=step4Local)
193  algorithm.parameter.nFactors = nFactors
194 
195  # Set input objects for the algorithm
196  algorithm.input.setModels(training.partialModels, step4LocalInput)
197  algorithm.input.setTable(training.partialData, dataTable)
198  algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
199 
200  # Build the implicit ALS partial model on the local node
201  # DistributedPartialResultStep4 class from training
202  return algorithm.compute()
203 
204 
205 def trainModel():
206 
207  step1LocalResult = [0] * nBlocks
208  step3LocalResult = [0] * nBlocks
209  step4LocalInput = [0] * nBlocks
210 
211  for i in range(nBlocks):
212  step4LocalInput[i] = KeyValueDataCollection()
213 
214  for iteration in range(maxIterations):
215 
216  # Update partial users factors
217  for i in range(nBlocks):
218  step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
219 
220  step2MasterResult = computeStep2Master(step1LocalResult)
221 
222  for i in range(nBlocks):
223  step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
224 
225  # Prepare input objects for the fourth step of the distributed algorithm
226  for i in range(nBlocks):
227  for j in range(nBlocks):
228  step4LocalInput[i][j] = step3LocalResult[j][i]
229 
230  for i in range(nBlocks):
231  usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
232 
233  # Update partial items factors
234  for i in range(nBlocks):
235  step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
236 
237  step2MasterResult = computeStep2Master(step1LocalResult)
238 
239  for i in range(nBlocks):
240  step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
241 
242  # Prepare input objects for the fourth step of the distributed algorithm
243  for i in range(nBlocks):
244  for j in range(nBlocks):
245  step4LocalInput[i][j] = step3LocalResult[j][i]
246 
247  for i in range(nBlocks):
248  itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
249 
250 
251 def testModel(i, j):
252  # Create an algorithm object to predict ratings based in the implicit ALS partial models
253  algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
254  algorithm.parameter.nFactors = nFactors
255 
256  # Set input objects for the algorithm
257  algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
258  algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
259 
260  # Predict ratings and retrieve the algorithm results
261  algorithm.compute()
262 
263  # Result class from ratings
264  res = algorithm.finalizeCompute()
265  return res.get(ratings.prediction)
266 
267 
268 def printResults():
269 
270  for i in range(nBlocks):
271  for j in range(nBlocks):
272  print("Ratings for users block {}, items block {} :".format(i, j))
273  printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
274 
275 if __name__ == "__main__":
276  for i in range(nBlocks):
277  readData(i)
278 
279  initializeModel()
280 
281  trainModel()
282 
283  for i in range(nBlocks):
284  for j in range(nBlocks):
285  predictedRatings[i][j] = testModel(i, j)
286 
287  printResults()

For more complete information about compiler optimizations, see our Optimization Notice.