當前位置:
首頁 > 科技 > 教程 | 如何使用DeepFake實現視頻換臉

教程 | 如何使用DeepFake實現視頻換臉


機器之心發布


作者:馮沁原




不久之前,AV 視頻換臉明星的 DeepFake 火了。這篇文章將一步步教你如何實現換臉。



 





如果你是第一次聽說 DeepFake,一定要點擊上面的視頻,親自感受一下尼古拉斯的臉是如何佔據全世界的每一個影片。




項目實戰




我們要如何實現視頻里的變臉呢?




因為視頻是連續的圖片,那麼我們只需要把每一張圖片中的臉切換了,就能得到變臉的新視頻了。那麼如何切換一個視頻中的圖片呢? 這需要我們 首先找到視頻中的臉,然後把臉進行切換。我們會發現,變臉這個難題可以拆解成如下的流程。







於是,在我們會在後續按照這五個步驟進行介紹。




視頻轉圖像


FFmpeg




FFmpeg 提供了處理音頻、視頻、字幕和相關源數據的工具庫。核心的庫包括:






  • libavcodec 提供了處理編碼的能力



  • libavformat 實現了流協議、容器類型、基本的 I/O 訪問 



  • libavutil 包括哈希、解壓縮等多樣的功能 



  • libavfilter 提供了鏈式修改音頻和視頻的能力 



  • libavdevice 提供了對設備訪問的抽象 



  • libswresample 實現了混音等能力 



  • libswscale 實現了顏色和尺度變換的能力




對外主要提供了三個工具:






  • ffmpeg 用來處理多媒體內容 



  • ffplay 是一個極簡的播放器 



  • ffprobe 是多媒體內容的分析工具




於是,我們的視頻轉圖片的功能,可以通過以下命令來實現,



ffmpeg -i clipname -vf fps=framerate -qscale:v

2

"imagename%04d.jpg"




具體來說,上面的指令可以把一個視頻,按照固定的頻率生成圖片。




人臉定位 




基本演算法




人臉定位是一個相對成熟的領域,主要應用 dlib 庫的相關功能。我們雖然可以定製一個人臉識別的演算法,但是我們也可以使用已有的通用的人臉識別 的函數庫。




有兩類演算法,一類是 HOG 的臉部標記演算法。






(來源: Facial landmarks with dlib, OpenCV, and Python) 




該演算法的效果如上圖。它將人臉分成了如下的區域:







  • 眼睛 (左/右)



  • 眉毛 (左/右)



  • 鼻子






  • 下巴 




基於這些標記,我們不僅能夠進行後續的換臉,也能檢測臉的具體形態,眨眼狀態等。例如,我們可以把這些點連在一起,得到更多的特徵。




(來源: Real-Time Face Pose Estimation ) 




尋找臉部標記是一個預測問題,輸入是一張圖片和興趣區域,輸出是興趣區域的關鍵點。




HOG 是如何找到人臉的呢? 這是一個通用的檢測演算法:







  • 從數據集中找到正樣本,並且計算 HOG 描述 



  • 從數據集中找到負樣本,並且計算 HOG 描述 



  • 基於 HOG 的描述使用分類演算法 



  • 在負樣本上在不同的起點和尺度進行分類,並且找到誤判的 HOG 



  • 基於上一步的負樣本,對模型進行重新的訓練




這裡有個問題,如何計算 HOG 的描述呢? 我們可以計算每個點的亮度,然後把每個點表示為指向更黑的方向的向量。如下圖所示:


 




 (來源: Machine Learning is Fun! Part 4: Modern Face Recognition with Deep Learning )







 (來源: Machine Learning is Fun! Part 4: Modern Face Recognition with Deep Learning )




 我們為什麼要這麼做呢? 因為每個點的絕對值會受到環境的影響,但是相對值則比較穩定。因此,我們通過梯度變化的表示,能夠準備出高質量的數據。當然,我們也可以進一步的把相鄰的點聚合在一起,從而產生更有代表性的數據。




現在可以進行檢測了






  • 首先在新的圖片上基於不同的起點和尺度尋找可行的區間;



  • 基於非極大抑制的方法來減少冗餘和重複的,下圖就是一個有冗餘和去除冗餘的情況,這個方法說白了就是找一個最大概率的矩陣去覆蓋掉和它過於重合的矩陣,並且不斷重複這個過程。






  (來源: Histogram of Oriented Gradients and Object Detection)




有了輪廓之後,我們可以找到臉部標記。尋找臉部標記的演算法是基於《One Millisecond Face Alignment with an Ensemble of Regression Trees》的論文。簡單來說,它利用了已經標記好的訓練集來訓練一個回歸樹的組合,從而用來預測。






 (來源: One Millisecond Face Alignment with an Ensemble of Regression Trees) 




在這個基礎上,就能夠標記出這 68 個點。






 (來源: Facial landmarks with dlib, OpenCV, and Python ) 




基於人臉的 68 個標記的坐標,可以計算人臉的?度,從而摳出擺正後的人臉。但是 dlib 要求識別的必須是全臉,因此會減少我們的樣本集以及一些特定的樣本場景。同時,因為人臉是 64*64 像素的尺寸,因此也要處理清晰度的問題。




另一種方法是用 CNN 訓練一個識別臉部的模型。CNN 能夠檢測更多的?度,但是需要更多的資源,並且可能在大文件上失效。




數據準備




我們的目標是把原始人臉轉換為目標人臉,因此我們需要收集原始人臉的圖片和目標人臉的圖片。如果你選擇的是一個名人,那麼可以直接用 Google image 得到你想要的圖片。雖然視頻中的圖片也能用,但是也可以收集一些多樣的數據。當然,我用的是我和我老婆的圖片,因此直接從我 們的 Photo 中導出即可。當人臉數據生成後,最好仔細檢查一下,避免不應該的臉或者其它的東東出現在你的訓練集中。




extract.py




Deepfake 用於定位人臉的演算法如下:



import

cv2

# 開源的計算機視覺庫

from

pathlib

import

Path

# 提供面向對象方式的文件訪問


from

tqdm

import

tqdm

# 提供進度條顯示功能


import

os

# 提供操作系統相關的訪問


import

numpy

as

np

# 提供科學計算相關的功能

from

lib.cli

import

DirectoryProcessor, rotate_image

# 處理一個目錄的文件,然後保存到新的目錄中;旋轉圖片,其實是在utils中


from

lib.utils

import

get_folder

# 獲得一個folder,不存在則創建


from

lib.multithreading

import

pool_process

# 多進程並發計算


from

lib.detect_blur

import

is_blurry

# 判斷圖片是否模糊


from

plugins.PluginLoader

import

PluginLoader

# 載入對應的演算法

class

ExtractTrainingData(DirectoryProcessor)

:

# 從訓練集提取頭像


   

def

create_parser(self, subparser, command, description)

:


       self.optional_arguments = self.get_optional_arguments()
       self.parser = subparser.add_parser(
           command,
           help=

"Extract the faces from a pictures."

,
           description=description,
           epilog=

"Questions and feedback:
           https://github.com/deepfakes/faceswap-playground"


           )

# 參數配置部分省略

   

def

process(self)

:


       extractor_name =

"Align"

# 對應的是Extract_Align.py


       self.extractor = PluginLoader.get_extractor(extractor_name)()
       processes = self.arguments.processes
       

try

:
           

if

processes !=

1

:

# 多進程處理圖片


               files = list(self.read_directory())
               

for

filename, faces

in

tqdm(pool_process(self.processFiles, files, processes=processes), total = len(files)):
                   self.num_faces_detected +=

1


                   self.faces_detected[os.path.basename(filename)] = faces
           

else

:

# 單進程處理圖片


               

for

filename

in

tqdm(self.read_directory()):
                   

try

:
                       image = cv2.imread(filename)
                       self.faces_detected[os.path.basename(filename)] = self.handleImage(image, filename)
                   

except

Exception

as

e:
                       

if

self.arguments.verbose:
                           print(

"Failed to extract from image: {}. Reason: {}"

.format(filename, e))
                       

pass


       

finally

:
           self.write_alignments()

   

def

processFiles(self, filename)

:

# 處理一個單獨的圖片的函數


       

try

:
           image = cv2.imread(filename)
           

return

filename, self.handleImage(image, filename)
       

except

Exception

as

e:
           

if

self.arguments.verbose:
               print(

"Failed to extract from image: {}. Reason: {}"

.format(filename, e))
           

pass


       

return

filename, []

   

def

getRotatedImageFaces(self, image, angle)

:

# 得到固定角度旋轉後的圖片的人臉


       rotated_image = rotate_image(image, angle)
       faces = self.get_faces(rotated_image, rotation=angle)
       rotated_faces = [(idx, face)

for

idx, face

in

faces]
       

return

rotated_faces, rotated_image

   

def

imageRotator(self, image)

:

# 得到一系列旋轉後的人臉


       

""" rotates the image through rotation_angles to try to find a face """


       

for

angle

in

self.rotation_angles:
           rotated_faces, rotated_image = self.getRotatedImageFaces(image, angle)
           

if

len(rotated_faces) >

0

:
               

if

self.arguments.verbose:
                   print(

"found face(s) by rotating image {} degrees"

.format(angle))
               

break


       

return

rotated_faces, rotated_image

   

def

handleImage(self, image, filename)

:


       faces = self.get_faces(image)
       process_faces = [(idx, face)

for

idx, face

in

faces]

       

# 沒有找到人臉,嘗試旋轉圖片


       

if

self.rotation_angles

is

not

None

and

len(process_faces) ==

0

:
           process_faces, image = self.imageRotator(image)

       rvals = []
       

for

idx, face

in

process_faces:
           

# 畫出人臉的標記


           

if

self.arguments.debug_landmarks:
               

for

(x, y)

in

face.landmarksAsXY():
                   cv2.circle(image, (x, y),

2

, (

0

,

0

,

255

),

-1

)

           resized_image, t_mat = self.extractor.extract(image, face,

256

, self.arguments.align_eyes)
           output_file = get_folder(self.output_dir) / Path(filename).stem

           

# 檢測圖片是否模糊


           

if

self.arguments.blur_thresh

is

not

None

:
               aligned_landmarks = self.extractor.transform_points(face.landmarksAsXY(), t_mat,

256

,

48

)
               feature_mask = self.extractor.get_feature_mask(aligned_landmarks /

256

,

256

,

48

)
               feature_mask = cv2.blur(feature_mask, (

10

,

10

))
               isolated_face = cv2.multiply(feature_mask, resized_image.astype(float)).astype(np.uint8)
               blurry, focus_measure = is_blurry(isolated_face, self.arguments.blur_thresh)
               

# print("{} focus measure: {}".format(Path(filename).stem, focus_measure))


               

# cv2.imshow("Isolated Face", isolated_face)


               

# cv2.waitKey(0)


               

# cv2.destroyAllWindows()


               

if

blurry:
                   print(

"{}"s focus measure of {} was below the blur threshold, moving to "blurry""

.format(Path(filename).stem, focus_measure))
                   output_file = get_folder(Path(self.output_dir) / Path(

"blurry"

)) / Path(filename).stem

           cv2.imwrite(

"{}_{}{}"

.format(str(output_file), str(idx), Path(filename).suffix), resized_image)

# 生成新圖片


           f = {
               

"r"

: face.r,
               

"x"

: face.x,
               

"w"

: face.w,
               

"y"

: face.y,
               

"h"

: face.h,
               

"landmarksXY"

: face.landmarksAsXY()
           }
           rvals.append(f)
       

return

rvals


 


注意,基於特徵標記的演算法對於傾斜的臉效果不好,也可以引入 CNN。




人臉轉換




人臉轉換的基本原理是什麼? 假設讓你盯著一個人的視頻連續看上 100 個小時,接著又給你看一眼另外一個人的照片,接著讓你憑著記憶畫出來剛才 的照片,你一定畫的會很像第一個人的。




我們使用的模型是 Autoencoder。有趣的是,這個模型所做的是基於原始的圖片再次生成原始的圖片。Autoencoder 的編碼器把圖片進行壓縮,而解 碼器把圖片進行還原,一個示例如下圖:






(來源: Building Autoencoders in Keras ) 




在這個基礎上,即使我們輸入的是另外一個人臉,也會被 Autoencoder 編碼成為一個類似原來的臉。





為了提升我們最終的效果,我們還需要把人臉共性相關的屬性和人臉特性相關的屬性進行學習。因此,我們對所有的臉都用一個統一的編碼器,這 個編碼器的目的是學習人臉共性的地方;然後,我們對每個臉有一個單獨的解碼器,這個解碼器是為了學習人臉個性的地方。這樣當你用 B 的臉通過 編碼器,再使用 A 的解碼器的話,你會得到一個與 B 的表情一致,但是 A 的臉。




這個過程用公式表示如下:




X

" = Decoder(Encoder(Shuffle(X)))
Loss = L1Loss(X"

-X)

A

" = Decoder_A(Encoder(Shuffle(A)))
Loss_A = L1Loss(A"

-A)

B

" = Decoder_B(Encoder(Shuffle(B)))
Loss_B = L1Loss(B"

-B)




具體來說,在訓練過程中,我們輸入 A 的圖片,通過編碼器和解碼器還原 A 的臉;然後我們輸入 B 的圖片,通過相同的編碼器但是不同的解碼器還原 B 的臉。不斷迭代這個過程,直到 loss 降低到一個閾值。在模型訓練的時候,我建議把 loss 降低到 0.02,這樣的效果會比較好。




這裡用的是比較標準的建模方式。值得注意的是,作者通過加入 PixelShuffler() 的函數把圖像進行了一定的扭曲,而這個扭曲增加了學習的難度,反 而讓模型能夠實現最終的效果。仔細想想這背後的道理,如果你一直在做簡單的題目,那麼必然不會有什麼解決難題的能力。但是,我只要把題目 做一些變體,就足以讓你成?。




因為在建模中使用的是原圖 A 的扭曲來還原 A,應用中是用 B 來還原 A,所以扭曲的方式會極大的影響到最終的結果。因此,如何選擇更好的扭曲方 式,也是一個重要的問題。




當我們圖片融合的時候,會有一個難題,如何又保證效果又防止圖片抖動。於是我們還要引入相關的演算法處理這些情況。於是我們可以知道,一個 看似直接的人臉轉換演算法在實際操作中需要考慮各種各樣的特殊情況,這才是真真的接地氣。




train.py




以下是進行訓練的演算法邏輯:



import

cv2

# 開源的計算機視覺庫


import

numpy

# 提供科學計算相關的功能


import

time

# 提供時間相關的功能

import

threading

# 提供多線程相關的功能


from

lib.utils

import

get_image_paths, get_folder

# 得到一個目錄下的圖片;獲得一個folder,不存在則創建


from

lib.cli

import

FullPaths, argparse, os, sys

from

plugins.PluginLoader

import

PluginLoader

# 載入對應的演算法

tf =

None


set_session =

None


def

import_tensorflow_keras()

:

# 在需要的時候載入TensorFlow和keras模塊


   

""" Import the TensorFlow and keras set_session modules only when they are required """


   

global

tf
   

global

set_session
   

if

tf

is

None

or

set_session

is

None

:
       

import

tensorflow
       

import

keras.backend.tensorflow_backend

# keras依賴底層的tensorflow實現具體的運算


       tf = tensorflow
       set_session = keras.backend.tensorflow_backend.set_session

class

TrainingProcessor(object)

:

# 訓練器


   arguments =

None

   

def

__init__(self, subparser, command, description=

"default"

)

:

# 初始化訓練器


       self.argument_list = self.get_argument_list()
       self.optional_arguments = self.get_optional_arguments()
       self.parse_arguments(description, subparser, command)
       self.lock = threading.Lock()

   

def

process_arguments(self, arguments)

:


       self.arguments = arguments
       print(

"Model A Directory: {}"

.format(self.arguments.input_A))
       print(

"Model B Directory: {}"

.format(self.arguments.input_B))
       print(

"Training data directory: {}"

.format(self.arguments.model_dir))

       self.process()

# 參數配置部分省略

   @staticmethod


   

def

get_optional_arguments()

:

# 創建一個存放參數的數組


       

""" Put the arguments in a list so that they are accessible from both argparse and gui """


       

# Override this for custom arguments


       argument_list = []
       

return

argument_list

   

def

parse_arguments(self, description, subparser, command)

:


       parser = subparser.add_parser(
           command,
           help=

"This command trains the model for the two faces A and B."

,
           description=description,
           epilog=

"Questions and feedback:
           https://github.com/deepfakes/faceswap-playground"

)

       

for

option

in

self.argument_list:
           args = option[

"opts"

]
           kwargs = {key: option[key]

for

key

in

option.keys()

if

key !=

"opts"

}
           parser.add_argument(*args, **kwargs)

       parser = self.add_optional_arguments(parser)
       parser.set_defaults(func=self.process_arguments)

   

def

add_optional_arguments(self, parser)

:


       

for

option

in

self.optional_arguments:
           args = option[

"opts"

]
           kwargs = {key: option[key]

for

key

in

option.keys()

if

key !=

"opts"

}
           parser.add_argument(*args, **kwargs)
       

return

parser

   

def

process(self)

:

# 具體的執行


       self.stop =

False


       self.save_now =

False

       thr = threading.Thread(target=self.processThread, args=(), kwargs={})

# 線程執行


       thr.start()

       

if

self.arguments.preview:
           print(

"Using live preview"

)
           

while

True

:
               

try

:
                   

with

self.lock:
                       

for

name, image

in

self.preview_buffer.items():
                           cv2.imshow(name, image)

                   key = cv2.waitKey(

1000

)
                   

if

key == ord(

"
"

)

or

key == ord(

"
"

):
                       

break


                   

if

key == ord(

"s"

):
                       self.save_now =

True


               

except

KeyboardInterrupt:
                   

break


       

else

:
           

try

:
               input()

# TODO how to catch a specific key instead of Enter?


               

# there isnt a good multiplatform solution: https://stackoverflow.com/questions/3523174/raw-input-in-python-without-pressing-enter


           

except

KeyboardInterrupt:
               

pass

       print(

"Exit requested! The trainer will complete its current cycle, save the models and quit (it can take up a couple of seconds depending on your training speed). If you want to kill it now, press Ctrl + c"

)
       self.stop =

True


       thr.join()

# waits until thread finishes

   

def

processThread(self)

:


       

try

:
           

if

self.arguments.allow_growth:
               self.set_tf_allow_growth()

           print(

"Loading data, this may take a while..."

)

# 載入數據


           

# this is so that you can enter case insensitive values for trainer


           trainer = self.arguments.trainer
           trainer =

"LowMem"

if

trainer.lower() ==

"lowmem"

else

trainer
           model = PluginLoader.get_model(trainer)(get_folder(self.arguments.model_dir), self.arguments.gpus)

# 讀取模型


           model.load(swapped=

False

)

           images_A = get_image_paths(self.arguments.input_A)

# 圖片A


           images_B = get_image_paths(self.arguments.input_B)

# 圖片B


           trainer = PluginLoader.get_trainer(trainer)

# 創建訓練器


           trainer = trainer(model, images_A, images_B, self.arguments.batch_size, self.arguments.perceptual_loss)

# 設置訓練器參數

           print(

"Starting. Press "Enter" to stop training and save model"

)

           

for

epoch

in

range(

0

, self.arguments.epochs):

               save_iteration = epoch % self.arguments.save_interval ==

0

               trainer.train_one_step(epoch, self.show

if

(save_iteration

or

self.save_now)

else

None

)

# 進行一步訓練

               

if

save_iteration:
                   model.save_weights()

               

if

self.stop:
                   

break

               

if

self.save_now:
                   model.save_weights()
                   self.save_now =

False

           model.save_weights()
           exit(

0

)
       

except

KeyboardInterrupt:
           

try

:
               model.save_weights()
           

except

KeyboardInterrupt:
               print(

"Saving model weights has been cancelled!"

)
           exit(

0

)
       

except

Exception

as

e:
           

raise

e
           exit(

1

)

   

def

set_tf_allow_growth(self)

:


       import_tensorflow_keras()
       config = tf.ConfigProto()
       config.gpu_options.allow_growth =

True


       config.gpu_options.visible_device_list=

"0"


       set_session(tf.Session(config=config))

   preview_buffer = {}

   

def

show(self, image, name=

""

)

:

# 提供預覽


       

try

:
           

if

self.arguments.redirect_gui:
               scriptpath = os.path.realpath(os.path.dirname(sys.argv[

0

]))
               img =

".gui_preview.png"


               imgfile = os.path.join(scriptpath, img)
               cv2.imwrite(imgfile, image)
           

elif

self.arguments.preview:
               

with

self.lock:
                   self.preview_buffer[name] = image
           

elif

self.arguments.write_image:
               cv2.imwrite(

"_sample_{}.jpg"

.format(name), image)
       

except

Exception

as

e:
           print(

"could not preview sample"

)
           

raise

e




Trainer.py




以下實現了一次具體的訓練:



import

time

import

numpy

from

lib.training_data

import

TrainingDataGenerator, stack_images

class

Trainer()

:


   random_transform_args = {

# 初始化參數


       

"rotation_range"

:

10

,
       

"zoom_range"

:

0.05

,
       

"shift_range"

:

0.05

,
       

"random_flip"

:

0.4

,
   }

   

def

__init__(self, model, fn_A, fn_B, batch_size, *args)

:


       self.batch_size = batch_size
       self.model = model

       generator = TrainingDataGenerator(self.random_transform_args,

160

)

# 讀取需要的數據


       self.images_A = generator.minibatchAB(fn_A, self.batch_size)
       self.images_B = generator.minibatchAB(fn_B, self.batch_size)

   

def

train_one_step(self, iter, viewer)

:

# 訓練一步


       epoch, warped_A, target_A = next(self.images_A)
       epoch, warped_B, target_B = next(self.images_B)

       loss_A = self.model.autoencoder_A.train_on_batch(warped_A, target_A)

# 計算損失


       loss_B = self.model.autoencoder_B.train_on_batch(warped_B, target_B)
       print(

"[{0}] [#{1:05d}] loss_A: {2:.5f}, loss_B: {3:.5f}"

.format(time.strftime(

"%H:%M:%S"

), iter, loss_A, loss_B),
           end=

"
"

)

       

if

viewer

is

not

None

:
           viewer(self.show_sample(target_A[

0

:

14

], target_B[

0

:

14

]),

"training"

)

   

def

show_sample(self, test_A, test_B)

:


       figure_A = numpy.stack([
           test_A,
           self.model.autoencoder_A.predict(test_A),
           self.model.autoencoder_B.predict(test_A),
       ], axis=

1

)
       figure_B = numpy.stack([
           test_B,
           self.model.autoencoder_B.predict(test_B),
           self.model.autoencoder_A.predict(test_B),
       ], axis=

1

)

       

if

test_A.shape[

0

] %

2

==

1

:
           figure_A = numpy.concatenate ([figure_A, numpy.expand_dims(figure_A[

0

],

0

) ])
           figure_B = numpy.concatenate ([figure_B, numpy.expand_dims(figure_B[

0

],

0

) ])

       figure = numpy.concatenate([figure_A, figure_B], axis=

0

)
       w =

4


       h = int( figure.shape[

0

] / w)
       figure = figure.reshape((w, h) + figure.shape[

1

:])
       figure = stack_images(figure)

       

return

numpy.clip(figure *

255

,

0

,

255

).astype(

"uint8"

)






AutoEncoder.py




以下是我們使用的AutoEncoder的演算法邏輯:



# AutoEncoder的基礎類

import

os, shutil

encoderH5 =

"encoder.h5"


decoder_AH5 =

"decoder_A.h5"


decoder_BH5 =

"decoder_B.h5"

class

AutoEncoder

:


   

def

__init__(self, model_dir, gpus)

:


       self.model_dir = model_dir
       self.gpus = gpus

       self.encoder = self.Encoder()
       self.decoder_A = self.Decoder()
       self.decoder_B = self.Decoder()

       self.initModel()

   

def

load(self, swapped)

:


       (face_A,face_B) = (decoder_AH5, decoder_BH5)

if

not

swapped

else

(decoder_BH5, decoder_AH5)

       

try

:

# 載入權重


           self.encoder.load_weights(str(self.model_dir / encoderH5))
           self.decoder_A.load_weights(str(self.model_dir / face_A))
           self.decoder_B.load_weights(str(self.model_dir / face_B))
           print(

"loaded model weights"

)
           

return

True


       

except

Exception

as

e:
           print(

"Failed loading existing training data."

)
           print(e)
           

return

False

   

def

save_weights(self)

:

# 存儲權重


       model_dir = str(self.model_dir)
       

if

os.path.isdir(model_dir +

"_bk"

):
           shutil.rmtree(model_dir +

"_bk"

)
       shutil.move(model_dir,  model_dir +

"_bk"

)
       os.mkdir(model_dir)
       self.encoder.save_weights(str(self.model_dir / encoderH5))
       self.decoder_A.save_weights(str(self.model_dir / decoder_AH5))
       self.decoder_B.save_weights(str(self.model_dir / decoder_BH5))
       print(

"saved model weights"

)




Model.py




以下是我們的具體模型:



# Based on the original https://www.reddit.com/r/deepfakes/ code sample + contribs

from

keras.models

import

Model

as

KerasModel

from

keras.layers

import

Input, Dense, Flatten, Reshape

from

keras.layers.advanced_activations

import

LeakyReLU

from

keras.layers.convolutional

import

Conv2D

from

keras.optimizers

import

Adam

from

.AutoEncoder

import

AutoEncoder

from

lib.PixelShuffler

import

PixelShuffler

from

keras.utils

import

multi_gpu_model

IMAGE_SHAPE = (

64

,

64

,

3

)
ENCODER_DIM =

1024

class

Model(AutoEncoder)

:


   

def

initModel(self)

:


       optimizer = Adam(lr=

5e-5

, beta_1=

0.5

, beta_2=

0.999

)  

# 深入理解Adam的優化


       x = Input(shape=IMAGE_SHAPE)

       self.autoencoder_A = KerasModel(x, self.decoder_A(self.encoder(x)))
       self.autoencoder_B = KerasModel(x, self.decoder_B(self.encoder(x)))

       

if

self.gpus >

1

:
           self.autoencoder_A = multi_gpu_model( self.autoencoder_A , self.gpus)
           self.autoencoder_B = multi_gpu_model( self.autoencoder_B , self.gpus)

       self.autoencoder_A.compile(optimizer=optimizer, loss=

"mean_absolute_error"

)
       self.autoencoder_B.compile(optimizer=optimizer, loss=

"mean_absolute_error"

)

   

def

converter(self, swap)

:


       autoencoder = self.autoencoder_B

if

not

swap

else

self.autoencoder_A
       

return

lambda

img: autoencoder.predict(img)

   

def

conv(self, filters)

:


       

def

block(x)

:


           x = Conv2D(filters, kernel_size=

5

, strides=

2

, padding=

"same"

)(x)
           x = LeakyReLU(

0.1

)(x)
           

return

x
       

return

block

   

def

upscale(self, filters)

:


       

def

block(x)

:


           x = Conv2D(filters *

4

, kernel_size=

3

, padding=

"same"

)(x)
           x = LeakyReLU(

0.1

)(x)

# 使用 LeakyReLU 激活函數


           x = PixelShuffler()(x)

# 將filter的大小變為原來的1/4,讓高和寬變為原來的兩倍


           

return

x
       

return

block

   

def

Encoder(self)

:


       input_ = Input(shape=IMAGE_SHAPE)
       x = input_
       x = self.conv(

128

)(x)
       x = self.conv(

256

)(x)
       x = self.conv(

512

)(x)
       x = self.conv(

1024

)(x)
       x = Dense(ENCODER_DIM)(Flatten()(x))
       x = Dense(

4

*

4

*

1024

)(x)
       x = Reshape((

4

,

4

,

1024

))(x)
       x = self.upscale(

512

)(x)
       

return

KerasModel(input_, x)

   

def

Decoder(self)

:


       input_ = Input(shape=(

8

,

8

,

512

))
       x = input_
       x = self.upscale(

256

)(x)
       x = self.upscale(

128

)(x)
       x = self.upscale(

64

)(x)
       x = Conv2D(

3

, kernel_size=

5

, padding=

"same"

, activation=

"sigmoid"

)(x)
       

return

KerasModel(input_, x)




整個網路的結構如下:






 來源: 刷爆朋友圈的視頻人物換臉是怎樣煉成的?





我們可以看出來,經歷了四個卷積層、展開層、全連接層,我們開始 upscale 整個模型。在我們 upscale 一半的時候,我們把 encoder 和 decoder 進行


了切割,從而保證了共性和個性的分離。




convert.py




在訓練的基礎上,我們現在可以進行圖片的轉換了。



import

cv2

import

re

import

os

from

pathlib

import

Path

from

tqdm

import

tqdm

from

lib.cli

import

DirectoryProcessor, FullPaths

from

lib.utils

import

BackgroundGenerator, get_folder, get_image_paths, rotate_image

from

plugins.PluginLoader

import

PluginLoader

class

ConvertImage(DirectoryProcessor)

:


   filename =

""


   

def

create_parser(self, subparser, command, description)

:


       self.optional_arguments = self.get_optional_arguments()
       self.parser = subparser.add_parser(
           command,
           help=

"Convert a source image to a new one with the face swapped."

,
           description=description,
           epilog=

"Questions and feedback:
           https://github.com/deepfakes/faceswap-playground"


       )

# 參數配置部分省略

   

def

process(self)

:

# 進行模型的轉換和拼接


       

# Original & LowMem models go with Adjust or Masked converter


       

#

Note:

GAN prediction outputs a mask + an image, while other predicts only an image


       model_name = self.arguments.trainer
       conv_name = self.arguments.converter
       self.input_aligned_dir =

None

       model = PluginLoader.get_model(model_name)(get_folder(self.arguments.model_dir), self.arguments.gpus)
       

if

not

model.load(self.arguments.swap_model):
           print(

"Model Not Found! A valid model must be provided to continue!"

)
           exit(

1

)

       input_aligned_dir = Path(self.arguments.input_dir)/Path(

"aligned"

)
       

if

self.arguments.input_aligned_dir

is

not

None

:
           input_aligned_dir = self.arguments.input_aligned_dir
       

try

:
           self.input_aligned_dir = [Path(path)

for

path

in

get_image_paths(input_aligned_dir)]
           

if

len(self.input_aligned_dir) ==

0

:
               print(

"Aligned directory is empty, no faces will be converted!"

)
           

elif

len(self.input_aligned_dir) <= len(self.input_dir)/

3

:
               print(

"Aligned directory contains an amount of images much less than the input, are you sure this is the right directory?"

)
       

except

:
           print(

"Aligned directory not found. All faces listed in the alignments file will be converted."

)

       converter = PluginLoader.get_converter(conv_name)(model.converter(

False

),
           trainer=self.arguments.trainer,
           blur_size=self.arguments.blur_size,
           seamless_clone=self.arguments.seamless_clone,
           sharpen_image=self.arguments.sharpen_image,
           mask_type=self.arguments.mask_type,
           erosion_kernel_size=self.arguments.erosion_kernel_size,
           match_histogram=self.arguments.match_histogram,
           smooth_mask=self.arguments.smooth_mask,
           avg_color_adjust=self.arguments.avg_color_adjust
       )

       batch = BackgroundGenerator(self.prepare_images(),

1

)

       

# frame ranges stuff...


       self.frame_ranges =

None

       

# split out the frame ranges and parse out "min" and "max" values


       minmax = {
           

"min"

:

0

,

# never any frames less than 0


           

"max"

: float(

"inf"

)
       }

       

if

self.arguments.frame_ranges:
           self.frame_ranges = [tuple(map(

lambda

q: minmax[q]

if

q

in

minmax.keys()

else

int(q), v.split(

"-"

)))

for

v

in

self.arguments.frame_ranges]

       

# last number regex. I know regex is hacky, but its reliablyhacky(tm).


       self.imageidxre = re.compile(

r"(d+)(?!.*d)"

)

       

for

item

in

batch.iterator():
           self.convert(converter, item)

   

def

check_skipframe(self, filename)

:


       

try

:
           idx = int(self.imageidxre.findall(filename)[

0

])
           

return

not

any(map(

lambda

b: b[

0

]<=idx<=b[

1

], self.frame_ranges))
       

except

:
           

return

False

   

def

check_skipface(self, filename, face_idx)

:


       aligned_face_name =

"{}_{}{}"

.format(Path(filename).stem, face_idx, Path(filename).suffix)
       aligned_face_file = Path(self.arguments.input_aligned_dir) / Path(aligned_face_name)
       

#

TODO:

Remove this temporary fix for backwards compatibility of filenames


       bk_compat_aligned_face_name =

"{}{}{}"

.format(Path(filename).stem, face_idx, Path(filename).suffix)
       bk_compat_aligned_face_file = Path(self.arguments.input_aligned_dir) / Path(bk_compat_aligned_face_name)
       

return

aligned_face_file

not

in

self.input_aligned_dir

and

bk_compat_aligned_face_file

not

in

self.input_aligned_dir

   

def

convert(self, converter, item)

:


       

try

:
           (filename, image, faces) = item

           skip = self.check_skipframe(filename)
           

if

self.arguments.discard_frames

and

skip:
               

return

           

if

not

skip:

# process frame as normal


               

for

idx, face

in

faces:
                   

if

self.input_aligned_dir

is

not

None

and

self.check_skipface(filename, idx):
                       

print

(

"face {} for frame {} was deleted, skipping"

.format(idx, os.path.basename(filename)))
                       

continue


                   

# Check for image rotations and rotate before mapping face


                   

if

face.r !=

0

:
                       height, width = image.shape[:

2

]
                       image = rotate_image(image, face.r)
                       image = converter.patch_image(image, face,

64

if

"128"

not

in

self.arguments.trainer

else

128

)
                       

#

TODO:

This switch between 64 and 128 is a hack for now. We should have a separate cli option for size


                       image = rotate_image(image, face.r *

-1

, rotated_width=width, rotated_height=height)
                   

else

:
                       image = converter.patch_image(image, face,

64

if

"128"

not

in

self.arguments.trainer

else

128

)
                       

#

TODO:

This switch between 64 and 128 is a hack for now. We should have a separate cli option for size

           output_file = get_folder(self.output_dir) / Path(filename).name
           cv2.imwrite(str(output_file), image)
       

except

Exception

as

e:
           print(

"Failed to convert image: {}. Reason: {}"

.format(filename, e))

   

def

prepare_images(self)

:


       self.read_alignments()
       is_have_alignments = self.have_alignments()
       

for

filename

in

tqdm(self.read_directory()):
           image = cv2.imread(filename)

           

if

is_have_alignments:
               

if

self.have_face(filename):
                   faces = self.get_faces_alignments(filename, image)
               

else

:
                   tqdm.write (

"no alignment found for {}, skipping"

.format(os.path.basename(filename)))
                   

continue


           

else

:
               faces = self.get_faces(image)
           

yield

filename, image, faces




 當然我們也可以用 GAN 演算法進行優化,那麼讓我們看一下使用 GAN 的模型。






 (來源: shaoanlu/faceswap-GAN) 




如上圖所示,我們首先扣取 A 的人臉,然後進行變形,之後經歷編碼和解碼生成了重建的臉和 Mask。以下是我們的學習目標。






 (來源: shaoanlu/faceswap-GAN) 




從圖片到視頻




基於我們 FFmpeg 的講解,可以使用以下命令將一批圖片合併為一個視頻:



ffmpeg  -f image2 -i imagename%

04

d.jpg -vcodec libx264 -crf

15

-pix_fmt yuv420p output_filename.mp4




如果你希望新生成的視頻有聲音,那就可以在最後把有聲音的視頻中的聲音拼接到你最後產生的目標視頻上即可。




雲平台部署




 我們可以在 Google Cloud 中部署雲平台。具體請看視頻展示,我在這裡展示幾個關鍵步驟:






 (來源: How to Create DeepFakes with Google Cloud GPU Services)








 (來源: How to Create DeepFakes with Google Cloud GPU Services)






 (來源: How to Create DeepFakes with Google Cloud GPU Services)






 (來源: How to Create DeepFakes with Google Cloud GPU Services) 




最後是我在 Google Cloud 上進行 Training 的一個截圖。


 





項目架構




最後讓我們從高層理解一下整個 DeepFake 項目的架構。







社會影響




我們已經聊了 Deepfake 的原理,那麼它到底有哪些真正的社會價值呢? 我們可以用任何人來拍攝一個電影,然後變成我們想要的任何人。我們可以 創建更加真實的虛擬人物。穿衣購物可以更加真人模擬。




總結




我們用到了如下的技術棧、框架、平台:






  • Dlib:基於 C++的機器學習演算法庫 OpenCV:計算機視覺演算法庫 Keras:在底層機器學習框架之上的高級 API 架構 TensorFlow:Google 開源的機器學習演算法框架 CUDA:Nvidia 提供的針對 GPU 加速的開發環境



  • Google Cloud Platform:Google 提供的雲計算服務平台 Virtualenv:創建獨立的 Python 環境 FFmpeg:多媒體音視頻處理開源庫



  • 現在就來上手,把你心愛的另一半的人臉搬上好萊塢吧。






本文為機器之心發布,

轉載請聯繫本公眾號獲得授權



?------------------------------------------------


加入機器之心(全職記者/實習生):hr@jiqizhixin.com


投稿或尋求報道:editor@jiqizhixin.com


廣告&商務合作:bd@jiqizhixin.com

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 機器之心 的精彩文章:

PyTorch為何如此高效好用?來探尋深度學習框架的內部架構
觀點 | 1cycle策略:實踐中的學習率設定應該是先增再降

TAG:機器之心 |