def __train(self): print ('Training the model ...') # Read in and make a list of calibration images car_filenames = glob.glob(self.__train_directory+'/vehicles/*/*') notcar_filenames = glob.glob(self.__train_directory+'/non-vehicles/*/*') # Extract features of from all car images car_features = [] for file in car_filenames: # Read in each one by one image = mpimg.imread(file) features = self.extract_features(image, color_space=self.color_space, spatial_size=self.spatial_size, hist_bins=self.hist_bins, orient=self.orient, pix_per_cell=self.pix_per_cell, cell_per_block=self.cell_per_block, hog_channel=self.hog_channel, spatial_feat=self.spatial_feat, hist_feat=self.hist_feat, hog_feat=self.hog_feat) car_features.append(np.concatenate(features)) # Extract features of from all not-car images notcar_features = [] for file in notcar_filenames: # Read in each one by one image = mpimg.imread(file) features = self.extract_features(image, color_space=self.color_space, spatial_size=self.spatial_size, hist_bins=self.hist_bins, orient=self.orient, pix_per_cell=self.pix_per_cell, cell_per_block=self.cell_per_block, hog_channel=self.hog_channel, spatial_feat=self.spatial_feat, hist_feat=self.hist_feat, hog_feat=self.hog_feat) notcar_features.append(np.concatenate(features)) X = np.vstack((car_features, notcar_features)).astype(np.float64) # Fit a per-column scaler self.X_scaler = StandardScaler().fit(X) # Apply the scaler to X scaled_X = self.X_scaler.transform(X) # Define the labels vector y = np.hstack((np.ones(len(car_features)), np.zeros(len(notcar_features)))) # Split up data into randomized training and test sets rand_state = np.random.randint(0, 100) X_train, X_test, y_train, y_test = train_test_split(scaled_X, y, test_size=0.2, random_state=rand_state) print('Using:',self.orient,'orientations',self.pix_per_cell, 'pixels per cell and', self.cell_per_block,'cells per block') print('Feature vector length:', len(X_train[0])) # Use a linear SVC self.svc = LinearSVC() self.svc.fit(X_train, y_train) # Check the score of the SVC print('Test Accuracy of SVC = ', round(self.svc.score(X_test, y_test), 4)) # Pickle to save time for subsequent runs binary = {} binary["svc"] = self.svc binary["X_scaler"] = self.X_scaler pickle.dump(binary, open(self.__train_directory + '/' + self.__binary_filename, "wb")) def __load_binary(self): '''Load previously computed trained classifier''' with open(self.__train_directory + '/' + self.__binary_filename, mode='rb') as f: binary = pickle.load(f) self.svc = binary['svc'] self.X_scaler = binary['X_scaler'] def get_data(self): '''Getter for the trained data. At the first call it gerenates it.''' if os.path.isfile(self.__train_directory + '/' + self.__binary_filename): self.__load_binary() else: self.__train() return self.svc, self.X_scaler
# Define a single function that can extract features using hog sub-sampling and make predictions def find_cars(self, img, plot=False): bbox_list = [] draw_img = np.copy(img) img = img.astype(np.float32)/255 img_tosearch = img[self.ystart:self.ystop,:,:] ctrans_tosearch = self.convert_color(img_tosearch, color_space='YCrCb') if self.scale != 1: imshape = ctrans_tosearch.shape ctrans_tosearch = cv2.resize(ctrans_tosearch, (np.int(imshape[1]/self.scale), np.int(imshape[0]/self.scale))) ch1 = ctrans_tosearch[:,:,0] ch2 = ctrans_tosearch[:,:,1] ch3 = ctrans_tosearch[:,:,2] # Define blocks and steps as above nxblocks = (ch1.shape[1] // self.pix_per_cell)-1 nyblocks = (ch1.shape[0] // self.pix_per_cell)-1 nfeat_per_block = self.orient*self.cell_per_block**2 # 64 was the orginal sampling rate, with 8 cells and 8 pix per cell window = 64 nblocks_per_window = (window // self.pix_per_cell)-1 cells_per_step = 2 # Instead of overlap, define how many cells to step nxsteps = (nxblocks - nblocks_per_window) // cells_per_step nysteps = (nyblocks - nblocks_per_window) // cells_per_step # Compute individual channel HOG features for the entire image hog1 = self.get_hog_features(ch1, self.orient, self.pix_per_cell, self.cell_per_block, feature_vec=False) hog2 = self.get_hog_features(ch2, self.orient, self.pix_per_cell, self.cell_per_block, feature_vec=False) hog3 = self.get_hog_features(ch3, self.orient, self.pix_per_cell, self.cell_per_block, feature_vec=False) bbox_all_list = [] for xb in range(nxsteps+1): for yb in range(nysteps): ypos = yb*cells_per_step xpos = xb*cells_per_step # Extract HOG for this patch hog_feat1 = hog1[ypos:ypos+nblocks_per_window, xpos:xpos+nblocks_per_window].ravel() hog_feat2 = hog2[ypos:ypos+nblocks_per_window, xpos:xpos+nblocks_per_window].ravel() hog_feat3 = hog3[ypos:ypos+nblocks_per_window, xpos:xpos+nblocks_per_window].ravel() hog_features = np.concatenate((hog_feat1, hog_feat2, hog_feat3)) xleft = xpos*self.pix_per_cell ytop = ypos*self.pix_per_cell # Extract the image patch subimg = cv2.resize(ctrans_tosearch[ytop:ytop+window, xleft:xleft+window], (64,64)) # Get color features spatial_features = self.bin_spatial(subimg, size=self.spatial_size) hist_features = self.color_hist(subimg, nbins=self.hist_bins) # Scale features and make a prediction test_features = self.X_scaler.transform(np.hstack((spatial_features, hist_features, hog_features)).reshape(1, -1)) test_prediction = self.svc.predict(test_features) # compute current seize of the window xbox_left = np.int(xleft*self.scale) ytop_draw = np.int(ytop*self.scale) win_draw = np.int(window*self.scale) bbox = ((xbox_left, ytop_draw+self.ystart),(xbox_left+win_draw,ytop_draw+win_draw+self.ystart)) if test_prediction == 1: bbox_list.append(bbox) bbox_all_list.append(bbox) if(plot==True): draw_img_detected = np.copy(draw_img) # draw all all searched windows for bbox in bbox_all_list: cv2.rectangle(draw_img, bbox[0], bbox[1], (0,0,255), 3) for bbox in bbox_list: cv2.rectangle(draw_img_detected, bbox[0], bbox[1], (0,0,255), 3) fig = plt.figure() plt.subplot(121) plt.imshow(draw_img) plt.title('Searched sliding windows') plt.subplot(122) plt.imshow(draw_img_detected, cmap='hot') plt.title('Detected vechicle windows') fig.tight_layout() plt.show() return bbox_list def draw_labeled_bboxes(self, img, labels): # Iterate through all detected cars for car_number in range(1, labels[1]+1): # Find pixels with each car_number label value nonzero = (labels[0] == car_number).nonzero() # Identify x and y values of those pixels nonzeroy = np.array(nonzero[0]) nonzerox = np.array(nonzero[1]) # Define a bounding box based on min/max x and y bbox = ((np.min(nonzerox), np.min(nonzeroy)), (np.max(nonzerox), np.max(nonzeroy))) # Draw the box on the image cv2.rectangle(img, bbox[0], bbox[1], (0,0,255), 6) # Return the image return img
def add_heat(self, heatmap, bbox_list): # Iterate through list of bboxes for box in bbox_list: # Add += 1 for all pixels inside each bbox # Assuming each "box" takes the form ((x1, y1), (x2, y2)) heatmap[box[0][1]:box[1][1], box[0][0]:box[1][0]] += 1 # Return updated heatmap return heatmap# Iterate through list of bboxes def apply_threshold(self, heatmap, threshold): # Zero out pixels below the threshold heatmap[heatmap <= threshold] = 0 # Return thresholded map return heatmap
from scipy.ndimage.measurements import label # Find final boxes from heatmap using label functionlabels = label(heatmap)if(plot==True): #print(labels[1], 'cars found') plt.imshow(labels[0], cmap='gray') plt.show()管道处理一个图像
def process_image(image, plot=False): box_list = vehicle_detector.find_cars(image, plot=plot) heat = np.zeros_like(image[:,:,0]).astype(np.float) # Add heat to each box in box list heat = vehicle_detector.add_heat(heat, box_list) # Apply threshold to help remove false positives heat = vehicle_detector.apply_threshold(heat,1) # Visualize the heatmap when displaying heatmap = np.clip(heat, 0, 255) # Find final boxes from heatmap using label function labels = label(heatmap) if(plot==True): #print(labels[1], 'cars found') plt.imshow(labels[0], cmap='gray') plt.show() new_image = vehicle_detector.draw_labeled_bboxes(image, labels) if(plot==True): fig = plt.figure() plt.subplot(121) plt.imshow(new_image) plt.title('Car Positions') plt.subplot(122) plt.imshow(heatmap, cmap='hot') plt.title('Heat Map') fig.tight_layout() plt.show() return new_imagedef process_test_images(vehicle_detector, plot=False): test_filenames = glob.glob(TEST_DIRECTORY+'/'+TEST_FILENAME) # Process each test image for image_filename in test_filenames: # Read in each image image = cv2.imread(image_filename) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB is standard in matlibplot image = process_image(image, plot) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # RGB is standard in matlibplot cv2.imwrite(OUTPUT_DIRECTORY+'/'+image_filename.split('/')[-1], image)
process_image(image, plot=False)在视频处理中使用了用于处理一个图像的相同流水线。每个帧都从视频中提取,由图像管道处理,并使用VideoFileClip和ffmpeg合并到最终的视频中
from moviepy.editor import VideoFileClip def process_video(video_filename, vehicle_detector, plot=False): video_input = VideoFileClip(video_filename + ".mp4") video_output = video_input.fl_image(process_image) video_output.write_videofile(video_filename + "_output.mp4", audio=False)process_test_images(vehicle_detector, plot=False)