26 from daal
import step1Local, step2Local, step2Master, step3Local, step4Local
27 import daal.algorithms.implicit_als.prediction.ratings
as ratings
28 import daal.algorithms.implicit_als.training
as training
29 import daal.algorithms.implicit_als.training.init
as init
30 from daal.data_management
import KeyValueDataCollection, HomogenNumericTable
32 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
33 if utils_folder
not in sys.path:
34 sys.path.insert(0, utils_folder)
35 from utils
import createSparseTable, printALSRatings
37 DAAL_PREFIX = os.path.join(
'..',
'data')
43 trainDatasetFileNames = [
44 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_1.csv'),
45 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_2.csv'),
46 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_3.csv'),
47 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_4.csv')
50 usersPartition = [0] * 1
51 usersPartition[0] = nBlocks
53 userOffsets = [0] * nBlocks
54 itemOffsets = [0] * nBlocks
61 dataTable = [0] * nBlocks
62 transposedDataTable = [0] * nBlocks
64 predictedRatings = [[0] * nBlocks
for x
in range(nBlocks)]
66 userStep3LocalInput = [0] * nBlocks
67 itemStep3LocalInput = [0] * nBlocks
69 itemsPartialResultLocal = [0] * nBlocks
70 usersPartialResultLocal = [0] * nBlocks
76 dataTable[block] = createSparseTable(trainDatasetFileNames[block])
79 def initializeStep1Local(block):
80 global itemsPartialResultLocal
81 global itemStep3LocalInput
85 initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
86 initAlgorithm.parameter.fullNUsers = nUsers
87 initAlgorithm.parameter.nFactors = nFactors
88 initAlgorithm.parameter.seed += block
89 usersPartitionArray = np.array(usersPartition, dtype=np.float64)
90 usersPartitionArray.shape = (1, 1)
92 initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
95 initAlgorithm.input.set(init.data, dataTable[block])
98 partialResult = initAlgorithm.compute()
99 itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
100 userOffsets[block] = partialResult.getCollection(init.offsets, block)
101 partialModelLocal = partialResult.getPartialModel(init.partialModel)
103 itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
104 itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
106 return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
108 def initializeStep2Local(block, initStep2LocalInput):
109 global transposedDataTable
110 global userStep3LocalInput
113 initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
115 initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
118 partialResult = initAlgorithm.compute()
120 transposedDataTable[block] = partialResult.getTable(init.transposedData)
121 userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
122 itemOffsets[block] = partialResult.getCollection(init.offsets, block)
124 def initializeModel():
125 initStep1LocalResult = [0] * nBlocks
127 for i
in range(nBlocks):
128 initStep1LocalResult[i] = initializeStep1Local(i)
130 initStep2LocalInput = [0] * nBlocks
132 for i
in range(nBlocks):
133 initStep2LocalInput[i] = KeyValueDataCollection()
134 for j
in range(nBlocks):
135 initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
137 for i
in range(nBlocks):
138 initializeStep2Local(i, initStep2LocalInput[i])
141 def computeStep1Local(partialResultLocal):
144 algorithm = training.Distributed(step=step1Local)
145 algorithm.parameter.nFactors = nFactors
148 algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
152 return algorithm.compute()
155 def computeStep2Master(step1LocalResult):
158 algorithm = training.Distributed(step=step2Master)
159 algorithm.parameter.nFactors = nFactors
163 for i
in range(nBlocks):
164 algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
168 res = algorithm.compute()
169 return res.get(training.outputOfStep2ForStep4)
172 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
175 algorithm = training.Distributed(step=step3Local)
176 algorithm.parameter.nFactors = nFactors
179 algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
180 algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
181 algorithm.input.setTable(training.offset, offsets)
185 res = algorithm.compute()
186 return res.get(training.outputOfStep3ForStep4)
189 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
192 algorithm = training.Distributed(step=step4Local)
193 algorithm.parameter.nFactors = nFactors
196 algorithm.input.setModels(training.partialModels, step4LocalInput)
197 algorithm.input.setTable(training.partialData, dataTable)
198 algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
202 return algorithm.compute()
207 step1LocalResult = [0] * nBlocks
208 step3LocalResult = [0] * nBlocks
209 step4LocalInput = [0] * nBlocks
211 for i
in range(nBlocks):
212 step4LocalInput[i] = KeyValueDataCollection()
214 for iteration
in range(maxIterations):
217 for i
in range(nBlocks):
218 step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
220 step2MasterResult = computeStep2Master(step1LocalResult)
222 for i
in range(nBlocks):
223 step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
226 for i
in range(nBlocks):
227 for j
in range(nBlocks):
228 step4LocalInput[i][j] = step3LocalResult[j][i]
230 for i
in range(nBlocks):
231 usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
234 for i
in range(nBlocks):
235 step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
237 step2MasterResult = computeStep2Master(step1LocalResult)
239 for i
in range(nBlocks):
240 step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
243 for i
in range(nBlocks):
244 for j
in range(nBlocks):
245 step4LocalInput[i][j] = step3LocalResult[j][i]
247 for i
in range(nBlocks):
248 itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
253 algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
254 algorithm.parameter.nFactors = nFactors
257 algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
258 algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
264 res = algorithm.finalizeCompute()
265 return res.get(ratings.prediction)
270 for i
in range(nBlocks):
271 for j
in range(nBlocks):
272 print(
"Ratings for users block {}, items block {} :".format(i, j))
273 printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
275 if __name__ ==
"__main__":
276 for i
in range(nBlocks):
283 for i
in range(nBlocks):
284 for j
in range(nBlocks):
285 predictedRatings[i][j] = testModel(i, j)