From ddc768871b907ebd3f605ff9b0c8628671a9596b Mon Sep 17 00:00:00 2001 From: Daniel Supernault Date: Tue, 16 Feb 2021 23:39:39 -0700 Subject: [PATCH] Update federation pipeline, add locks --- app/Jobs/InboxPipeline/InboxValidator.php | 9 +++ app/Jobs/InboxPipeline/InboxWorker.php | 9 +++ app/Util/ActivityPub/Helpers.php | 97 +++++++++++++---------- 3 files changed, 75 insertions(+), 40 deletions(-) diff --git a/app/Jobs/InboxPipeline/InboxValidator.php b/app/Jobs/InboxPipeline/InboxValidator.php index bfcb4d6d0..4b85a1bd0 100644 --- a/app/Jobs/InboxPipeline/InboxValidator.php +++ b/app/Jobs/InboxPipeline/InboxValidator.php @@ -53,6 +53,15 @@ class InboxValidator implements ShouldQueue $profile = Profile::whereNull('domain')->whereUsername($username)->first(); + if(isset($payload['id'])) { + $lockKey = hash('sha256', $payload['id']); + if(Cache::get($lockKey) !== null) { + // Job processed already + return 1; + } + Cache::put($lockKey, 1, 300); + } + if(!isset($headers['signature']) || !isset($headers['date'])) { return; } diff --git a/app/Jobs/InboxPipeline/InboxWorker.php b/app/Jobs/InboxPipeline/InboxWorker.php index acc72f16f..ad3a085b5 100644 --- a/app/Jobs/InboxPipeline/InboxWorker.php +++ b/app/Jobs/InboxPipeline/InboxWorker.php @@ -49,6 +49,15 @@ class InboxWorker implements ShouldQueue $headers = $this->headers; $payload = json_decode($this->payload, true, 8); + if(isset($payload['id'])) { + $lockKey = hash('sha256', $payload['id']); + if(Cache::get($lockKey) !== null) { + // Job processed already + return 1; + } + Cache::put($lockKey, 1, 300); + } + if(!isset($headers['signature']) || !isset($headers['date'])) { return; } diff --git a/app/Util/ActivityPub/Helpers.php b/app/Util/ActivityPub/Helpers.php index 1f127eb99..583cd8dc0 100644 --- a/app/Util/ActivityPub/Helpers.php +++ b/app/Util/ActivityPub/Helpers.php @@ -346,27 +346,41 @@ class Helpers { $reply_to = null; } $ts = is_array($res['published']) ? $res['published'][0] : $res['published']; - $status = DB::transaction(function() use($profile, $res, $url, $ts, $reply_to, $cw, $scope, $id) { - $status = new Status; - $status->profile_id = $profile->id; - $status->url = isset($res['url']) ? $res['url'] : $url; - $status->uri = isset($res['url']) ? $res['url'] : $url; - $status->object_url = $id; - $status->caption = strip_tags($res['content']); - $status->rendered = Purify::clean($res['content']); - $status->created_at = Carbon::parse($ts); - $status->in_reply_to_id = $reply_to; - $status->local = false; - $status->is_nsfw = $cw; - $status->scope = $scope; - $status->visibility = $scope; - $status->cw_summary = $cw == true && isset($res['summary']) ? - Purify::clean(strip_tags($res['summary'])) : null; - $status->save(); - if($reply_to == null) { - self::importNoteAttachment($res, $status); - } - return $status; + + $statusLockKey = 'helpers:status-lock:' . hash('sha256', $res['id']); + $status = Cache::lock($statusLockKey) + ->get(function () use( + $profile, + $res, + $url, + $ts, + $reply_to, + $cw, + $scope, + $id + ) { + return DB::transaction(function() use($profile, $res, $url, $ts, $reply_to, $cw, $scope, $id) { + $status = new Status; + $status->profile_id = $profile->id; + $status->url = isset($res['url']) ? $res['url'] : $url; + $status->uri = isset($res['url']) ? $res['url'] : $url; + $status->object_url = $id; + $status->caption = strip_tags($res['content']); + $status->rendered = Purify::clean($res['content']); + $status->created_at = Carbon::parse($ts); + $status->in_reply_to_id = $reply_to; + $status->local = false; + $status->is_nsfw = $cw; + $status->scope = $scope; + $status->visibility = $scope; + $status->cw_summary = $cw == true && isset($res['summary']) ? + Purify::clean(strip_tags($res['summary'])) : null; + $status->save(); + if($reply_to == null) { + self::importNoteAttachment($res, $status); + } + return $status; + }); }); return $status; @@ -458,25 +472,28 @@ class Helpers { $profile = Profile::whereRemoteUrl($res['id'])->first(); if(!$profile) { - $profile = DB::transaction(function() use($domain, $webfinger, $res, $runJobs) { - $profile = new Profile(); - $profile->domain = strtolower($domain); - $profile->username = strtolower(Purify::clean($webfinger)); - $profile->name = isset($res['name']) ? Purify::clean($res['name']) : 'user'; - $profile->bio = isset($res['summary']) ? Purify::clean($res['summary']) : null; - $profile->sharedInbox = isset($res['endpoints']) && isset($res['endpoints']['sharedInbox']) ? $res['endpoints']['sharedInbox'] : null; - $profile->inbox_url = strtolower($res['inbox']); - $profile->outbox_url = strtolower($res['outbox']); - $profile->remote_url = strtolower($res['id']); - $profile->public_key = $res['publicKey']['publicKeyPem']; - $profile->key_id = $res['publicKey']['id']; - $profile->webfinger = strtolower(Purify::clean($webfinger)); - $profile->last_fetched_at = now(); - $profile->save(); - if(config('pixelfed.cloud_storage') == true) { - RemoteAvatarFetch::dispatch($profile); - } - return $profile; + $profileLockKey = 'helpers:profile-lock:' . hash('sha256', $res['id']); + $profile = Cache::lock($profileLockKey)->get(function () use($domain, $webfinger, $res, $runJobs) { + return DB::transaction(function() use($domain, $webfinger, $res, $runJobs) { + $profile = new Profile(); + $profile->domain = strtolower($domain); + $profile->username = strtolower(Purify::clean($webfinger)); + $profile->name = isset($res['name']) ? Purify::clean($res['name']) : 'user'; + $profile->bio = isset($res['summary']) ? Purify::clean($res['summary']) : null; + $profile->sharedInbox = isset($res['endpoints']) && isset($res['endpoints']['sharedInbox']) ? $res['endpoints']['sharedInbox'] : null; + $profile->inbox_url = strtolower($res['inbox']); + $profile->outbox_url = strtolower($res['outbox']); + $profile->remote_url = strtolower($res['id']); + $profile->public_key = $res['publicKey']['publicKeyPem']; + $profile->key_id = $res['publicKey']['id']; + $profile->webfinger = strtolower(Purify::clean($webfinger)); + $profile->last_fetched_at = now(); + $profile->save(); + if(config('pixelfed.cloud_storage') == true) { + RemoteAvatarFetch::dispatch($profile); + } + return $profile; + }); }); } else { // Update info after 24 hours