Merge pull request #6231 from shleeable/JobBatchv1-StatusPipeline

JobBatch v1- Status pipelines defensive checks
pull/6260/head
dansup 2 weeks ago committed by GitHub
commit 03e64efd68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -10,6 +10,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class NewStatusPipeline implements ShouldQueue
@ -45,7 +46,15 @@ class NewStatusPipeline implements ShouldQueue
*/
public function handle()
{
if (!Status::where('id', $this->status->id)->exists()) {
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("NewStatusPipeline: Status no longer exists, skipping job");
return;
}
if (!Status::where('id', $status->id)->exists()) {
// The status has already been deleted by the time the job is running
// Don't publish the status, and just no-op
return;
@ -62,6 +71,12 @@ class NewStatusPipeline implements ShouldQueue
return;
}
}
StatusEntityLexer::dispatch($this->status);
try {
StatusEntityLexer::dispatch($status);
} catch (\Exception $e) {
Log::warning("NewStatusPipeline: Failed to dispatch StatusEntityLexer for status {$status->id}: " . $e->getMessage());
throw $e;
}
}
}

@ -29,6 +29,7 @@ use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class RemoteStatusDelete implements ShouldBeUniqueUntilProcessing, ShouldQueue
{
@ -95,6 +96,12 @@ class RemoteStatusDelete implements ShouldBeUniqueUntilProcessing, ShouldQueue
{
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("RemoteStatusDelete: Status no longer exists, skipping job");
return;
}
if ($status->deleted_at) {
return;
}

@ -51,8 +51,21 @@ class StatusActivityPubDeliver implements ShouldQueue
public function handle()
{
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("StatusActivityPubDeliver: Status no longer exists, skipping job");
return;
}
$profile = $status->profile;
// Verify profile exists
if (!$profile) {
Log::info("StatusActivityPubDeliver: Profile no longer exists for status {$status->id}, skipping job");
return;
}
// ignore group posts
// if($status->group_id != null) {
// return;

@ -30,6 +30,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use League\Fractal;
use League\Fractal\Serializer\ArraySerializer;
@ -68,7 +69,20 @@ class StatusDelete implements ShouldQueue
public function handle()
{
$status = $this->status;
$profile = $this->status->profile;
// Verify status exists
if (!$status) {
Log::info("StatusDelete: Status no longer exists, skipping job");
return;
}
$profile = $status->profile;
// Verify profile exists
if (!$profile) {
Log::info("StatusDelete: Profile no longer exists for status {$status->id}, skipping job");
return;
}
StatusService::del($status->id, true);
if ($profile) {

@ -23,6 +23,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class StatusEntityLexer implements ShouldQueue
{
@ -58,9 +59,26 @@ class StatusEntityLexer implements ShouldQueue
*/
public function handle()
{
$profile = $this->status->profile;
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("StatusEntityLexer: Status no longer exists, skipping job");
return;
}
// Verify status has a profile
if (!$status->profile_id) {
Log::info("StatusEntityLexer: Status {$status->id} has no profile_id, skipping job");
return;
}
$profile = $status->profile;
if (!$profile) {
Log::info("StatusEntityLexer: Profile no longer exists for status {$status->id}, skipping job");
return;
}
if (in_array($status->type, ['photo', 'photo:album', 'video', 'video:album', 'photo:video:album'])) {
$profile->status_count = $profile->status_count + 1;
$profile->save();

@ -49,8 +49,21 @@ class StatusLocalUpdateActivityPubDeliverPipeline implements ShouldQueue
public function handle()
{
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("StatusLocalUpdateActivityPubDeliverPipeline: Status no longer exists, skipping job");
return;
}
$profile = $status->profile;
// Verify profile exists
if (!$profile) {
Log::info("StatusLocalUpdateActivityPubDeliverPipeline: Profile no longer exists for status {$status->id}, skipping job");
return;
}
// ignore group posts
// if($status->group_id != null) {
// return;

@ -15,6 +15,7 @@ use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use Purify;
class StatusRemoteUpdatePipeline implements ShouldQueue
@ -37,28 +38,51 @@ class StatusRemoteUpdatePipeline implements ShouldQueue
public function handle(): void
{
$activity = $this->activity;
// Verify activity exists and has required fields
if (!$activity) {
Log::info("StatusRemoteUpdatePipeline: Activity not found, skipping job");
return;
}
if (!isset($activity['id'])) {
Log::info("StatusRemoteUpdatePipeline: Invalid activity data, skipping job");
return;
}
$status = Status::with('media')->whereObjectUrl($activity['id'])->first();
if (! $status) {
Log::info("StatusRemoteUpdatePipeline: Status not found for activity {$activity['id']}, skipping job");
return;
}
$this->createPreviousEdit($status);
$this->updateMedia($status, $activity);
$this->updateImmediateAttributes($status, $activity);
$this->createEdit($status, $activity);
try {
$this->createPreviousEdit($status);
$this->updateMedia($status, $activity);
$this->updateImmediateAttributes($status, $activity);
$this->createEdit($status, $activity);
} catch (\Exception $e) {
Log::warning("StatusRemoteUpdatePipeline: Failed to update status {$status->id}: " . $e->getMessage());
throw $e;
}
}
protected function createPreviousEdit($status)
{
if (! $status->edits()->count()) {
StatusEdit::create([
'status_id' => $status->id,
'profile_id' => $status->profile_id,
'caption' => $status->caption,
'spoiler_text' => $status->cw_summary,
'is_nsfw' => $status->is_nsfw,
'ordered_media_attachment_ids' => $status->media()->orderBy('order')->pluck('id')->toArray(),
'created_at' => $status->created_at,
]);
try {
if (! $status->edits()->count()) {
StatusEdit::create([
'status_id' => $status->id,
'profile_id' => $status->profile_id,
'caption' => $status->caption,
'spoiler_text' => $status->cw_summary,
'is_nsfw' => $status->is_nsfw,
'ordered_media_attachment_ids' => $status->media()->orderBy('order')->pluck('id')->toArray(),
'created_at' => $status->created_at,
]);
}
} catch (\Exception $e) {
Log::warning("StatusRemoteUpdatePipeline: Failed to create previous edit for status {$status->id}: " . $e->getMessage());
throw $e;
}
}

@ -11,6 +11,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
use App\Services\NotificationService;
use App\Services\StatusService;
@ -49,14 +50,36 @@ class StatusReplyPipeline implements ShouldQueue
public function handle()
{
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("StatusReplyPipeline: Status no longer exists, skipping job");
return 1;
}
// Verify status is a reply
if (!$status->in_reply_to_id) {
Log::info("StatusReplyPipeline: Status {$status->id} is not a reply, skipping job");
return 1;
}
$actor = $status->profile;
$reply = Status::find($status->in_reply_to_id);
if (!$actor) {
Log::info("StatusReplyPipeline: Actor profile no longer exists for status {$status->id}, skipping job");
return 1;
}
if(!$actor || !$reply) {
$reply = Status::find($status->in_reply_to_id);
if (!$reply) {
Log::info("StatusReplyPipeline: Reply status {$status->in_reply_to_id} no longer exists for status {$status->id}, skipping job");
return 1;
}
$target = $reply->profile;
if (!$target) {
Log::info("StatusReplyPipeline: Target profile no longer exists for reply {$reply->id}, skipping job");
return 1;
}
$exists = Notification::whereProfileId($target->id)
->whereActorId($actor->id)

@ -17,6 +17,7 @@ use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
class StatusTagsPipeline implements ShouldQueue
{
@ -47,6 +48,18 @@ class StatusTagsPipeline implements ShouldQueue
$res = $this->activity;
$status = $this->status;
// Verify status exists
if (!$status) {
Log::info("StatusTagsPipeline: Status no longer exists, skipping job");
return;
}
// Verify activity data exists
if (!$res || !isset($res['tag'])) {
Log::info("StatusTagsPipeline: No tag data in activity for status {$status->id}, skipping job");
return;
}
if (isset($res['tag']['type'], $res['tag']['name'])) {
$res['tag'] = [$res['tag']];
}

Loading…
Cancel
Save