Python* API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

compressor.py

1 # file: compressor.py
2 #===============================================================================
3 # Copyright 2014-2020 Intel Corporation
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #===============================================================================
17 
18 #
19 # ! Content:
20 # ! Python example of a compressor
21 # !
22 # !*****************************************************************************
23 
24 #
25 
26 
27 #
28 
29 import os
30 import sys
31 
32 if sys.version[0] == '2':
33  import Queue as Queue
34 else:
35  import queue as Queue
36 
37 import numpy as np
38 
39 from daal.data_management import Compressor_Zlib, Decompressor_Zlib
40 
41 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
42 if utils_folder not in sys.path:
43  sys.path.insert(0, utils_folder)
44 from utils import getCRC32, readTextFile
45 
46 datasetFileName = os.path.join('..', 'data', 'batch', 'logitboost_train.csv')
47 
48 # Queue for sending and receiving compressed data blocks
49 sendReceiveQueue = Queue.Queue()
50 
51 maxDataBlockSize = 16384 # Maximum size of a data block
52 
53 def getUncompressedDataBlock(sentDataStream, availableDataSize):
54  cur_pos = sentDataStream.size - availableDataSize
55 
56  # return next slice of array and remaining datasize
57  if availableDataSize >= maxDataBlockSize:
58  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
59  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
60  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
61  return (None,None)
62 
63 
64 def sendCompressedDataBlock(block):
65  currentBlock = np.copy(block)
66  # Push the current compressed block to the queue
67  sendReceiveQueue.put(currentBlock)
68 
69 
70 def receiveCompressedDataBlock():
71  # Stop at the end of the queue
72  if sendReceiveQueue.empty():
73  return None
74  # Receive and copy the current compressed block from the queue
75  return np.copy(sendReceiveQueue.get())
76 
77 
78 def printCRC32(sentDataStream, receivedDataStream):
79  # Compute checksums for full input data and full received data
80  crcSentDataStream = getCRC32(sentDataStream)
81  crcReceivedDataStream = getCRC32(receivedDataStream)
82 
83  print("\nCompression example program results:\n")
84 
85  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
86  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
87 
88  if sentDataStream.size != receivedDataStream.size:
89  print("ERROR: Received data size mismatches with the sent data size")
90 
91  elif crcSentDataStream != crcReceivedDataStream:
92  print("ERROR: Received data CRC mismatches with the sent data CRC")
93  else:
94  print("OK: Received data CRC matches with the sent data CRC")
95 
96 
97 if __name__ == "__main__":
98 
99  # Read data from a file
100  sentDataStream = readTextFile(datasetFileName)
101 
102  # Allocate buffers for compressed and received data
103  compressedDataBlock = np.empty(maxDataBlockSize, dtype=np.uint8)
104  receivedDataStream = np.empty(sentDataStream.size, dtype=np.uint8)
105 
106  # Create a compressor
107  compressor = Compressor_Zlib()
108 
109  # Receive the next data block for compression
110  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, sentDataStream.size)
111  while uncompressedDataBlock is not None:
112  # Associate data to compress with the compressor
113  compressor.setInputDataBlock(uncompressedDataBlock, 0)
114 
115  # Memory for a compressed block might not be enough to compress the input block at once
116  while True:
117  # Compress uncompressedDataBlock to compressedDataBlock
118  compressor.run(compressedDataBlock, 0)
119 
120  # Get the actual size of a compressed block
121  compressedDataView = compressedDataBlock[0:compressor.getUsedOutputDataBlockSize()]
122 
123  # Send the current compressed block
124  sendCompressedDataBlock(compressedDataView)
125 
126  # Check if an additional data block is needed to complete compression
127  if not compressor.isOutputDataBlockFull():
128  break
129 
130  # Receive the next data block for compression
131  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, availableDataSize)
132 
133  # Create a decompressor
134  decompressor = Decompressor_Zlib()
135 
136  # Receive compressed data by blocks
137  compressedDataBlock = receiveCompressedDataBlock()
138  offset = 0
139 
140  while compressedDataBlock is not None:
141  # Associate compressed data with the decompressor
142  decompressor.setInputDataBlock(compressedDataBlock, 0)
143 
144  # Decompress an incoming block to the end of receivedDataStream
145  decompressor.run(receivedDataStream, offset)
146 
147  # Update the size of actual data in receivedDataStream
148  offset += decompressor.getUsedOutputDataBlockSize()
149 
150  # Receive next block
151  compressedDataBlock = receiveCompressedDataBlock()
152 
153  # Compute and print checksums for sentDataStream and receivedDataStream
154  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.