32 if sys.version[0] ==
'2':
39 from daal.data_management
import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
41 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
42 if utils_folder
not in sys.path:
43 sys.path.insert(0, utils_folder)
44 from utils
import getCRC32, readTextFile
46 datasetFileName = os.path.join(
'..',
'data',
'online',
'logitboost_train.csv')
50 sendReceiveQueue = Queue.Queue()
52 maxDataBlockSize = 16384
53 userDefinedBlockSize = 7000
55 def getDataBlock(sentDataStream, availableDataSize):
56 cur_pos = sentDataStream.size - availableDataSize
59 if availableDataSize >= maxDataBlockSize:
60 return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
61 elif availableDataSize < maxDataBlockSize
and availableDataSize > 0:
62 return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
66 def sendDataBlock(block):
67 currentBlock = np.copy(block)
69 sendReceiveQueue.put(currentBlock)
72 def receiveDataBlock():
74 if sendReceiveQueue.empty():
77 return np.copy(sendReceiveQueue.get())
80 def printCRC32(sentDataStream, receivedDataStream):
82 crcSentDataStream = getCRC32(sentDataStream)
83 crcReceivedDataStream = getCRC32(receivedDataStream)
85 print(
"\nCompression example program results:\n")
87 print(
"Input data checksum: 0x{:02X}".format(crcSentDataStream))
88 print(
"Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
90 if sentDataStream.size != receivedDataStream.size:
91 print(
"ERROR: Received data size mismatches with the sent data size")
93 elif crcSentDataStream != crcReceivedDataStream:
94 print(
"ERROR: Received data CRC mismatches with the sent data CRC")
96 print(
"OK: Received data CRC matches with the sent data CRC")
99 if __name__ ==
"__main__":
101 sentDataStream = readTextFile(datasetFileName)
104 compressor = Compressor_Zlib()
105 compressor.parameter.gzHeader =
True
106 compressor.parameter.level = level9
109 compressionStream = CompressionStream(compressor)
112 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
113 while uncompressedDataBlock
is not None:
115 compressionStream.push_back(uncompressedDataBlock)
118 compressedBlocks = compressionStream.getCompressedBlocksCollection()
121 for i
in range(compressedBlocks.size()):
123 sendDataBlock(compressedBlocks[i].getArray())
126 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
129 decompressor = Decompressor_Zlib()
130 decompressor.parameter.gzHeader =
True
133 decompressionStream = DecompressionStream(decompressor)
139 receivedDataStream = np.empty(0, dtype=np.uint8)
140 tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
143 compressedDataBlock = receiveDataBlock()
145 while compressedDataBlock
is not None:
147 decompressionStream.push_back(compressedDataBlock)
152 readSize = decompressionStream.copyDecompressedArray(tmp_block)
156 receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
159 compressedDataBlock = receiveDataBlock()
162 printCRC32(sentDataStream, receivedDataStream)