import { Injectable } from '@angular/core';
import type { SlowLaneDataRowMessage } from '@team-rocos/rocos-js';
import { StreamQuery } from '@team-rocos/rocos-js';
import type { Observable } from 'rxjs';
import { Subject } from 'rxjs';
import { map } from 'rxjs/operators';
import { RocosClientService } from '../rocos-client';

@Injectable({
  providedIn: 'root',
})
export class SerachService {
  // Some out-of-sync robots produce streams with inconsistent
  // date (in the future) thus making it impractical to explore
  public isStreamsValid: Subject<boolean> = new Subject();

  constructor(private rocosClientService: RocosClientService) {}

  public searchStream(query: StreamQuery): Observable<SlowLaneDataRowMessage[]> {
    this.checkInvalidStream(query);
    return this.getStreams(query);
  }

  private getStreams(query: StreamQuery): Observable<SlowLaneDataRowMessage[]> {
    return this.rocosClientService.rocosClient.search.searchStream(query).pipe(
      map((res) => {
        return res.data as any[];
      }),
    );
  }

  private checkInvalidStream(query: StreamQuery): void {
    const today = new Date();
    const futureQuery: StreamQuery = new StreamQuery();
    Object.assign(futureQuery, {
      ...query,
    });
    // startDate now + 10 minutes
    futureQuery.startDate = today.getTime() + 600000;
    futureQuery.interval = '1y';
    delete futureQuery.endDate;

    this.getStreams(futureQuery)
      .pipe(map((data) => data.every((d) => d.rows.length === 0)))
      .subscribe((valid) => this.isStreamsValid.next(valid));
  }
}
